You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/06/21 00:19:12 UTC

[1/2] drill git commit: DRILL-5457: Spill implementation for Hash Aggregate

Repository: drill
Updated Branches:
  refs/heads/master be43a9edd -> c16e5f807


http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 7cc43ad..21d5a4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -24,8 +24,8 @@ import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
@@ -40,13 +40,17 @@ public interface HashAggregator {
       new TemplateClassDefinition<HashAggregator>(HashAggregator.class, HashAggTemplate.class);
 
   public static enum AggOutcome {
-    RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR
+    RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR, CALL_WORK_AGAIN
   }
 
+  // For returning results from outputCurrentBatch
+  // OK - batch returned, NONE - end of data, RESTART - call again
+  public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
+
   public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
-      OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing,
-      LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds,
-      VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException;
+                             OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
+                             LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds,
+                             VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException;
 
   public abstract IterOutcome getOutcome();
 
@@ -60,6 +64,9 @@ public interface HashAggregator {
 
   public abstract boolean buildComplete();
 
-  public abstract IterOutcome outputCurrentBatch();
+  public abstract AggIterOutcome outputCurrentBatch();
+
+  public abstract boolean earlyOutput();
 
+  public abstract RecordBatch getNewIncoming();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
new file mode 100644
index 0000000..b05353e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+/**
+ * A class to replace "incoming" - instead scanning a spilled partition file
+ */
+public class SpilledRecordbatch implements CloseableRecordBatch {
+  private VectorContainer container;
+  private InputStream spillStream;
+  private int spilledBatches;
+  private FragmentContext context;
+  private BatchSchema schema;
+  private OperatorContext oContext;
+  private SpillSet spillSet;
+  // Path spillStreamPath;
+  private String spillFile;
+  VectorAccessibleSerializable vas;
+
+  public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
+    this.context = context;
+    this.schema = schema;
+    this.spilledBatches = spilledBatches;
+    this.oContext = oContext;
+    this.spillSet = spillSet;
+    //this.spillStreamPath = spillStreamPath;
+    this.spillFile = spillFile;
+    vas = new VectorAccessibleSerializable(oContext.getAllocator());
+    container = vas.get();
+
+    try {
+      this.spillStream = this.spillSet.openForInput(spillFile);
+    } catch (IOException e) { throw new RuntimeException(e);}
+
+    next(); // initialize the container
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return container.getValueVectorId(path);
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return container.getValueAccessorById(clazz, ids);
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return container.iterator();
+  }
+
+  @Override
+  public FragmentContext getContext() { return context; }
+
+  @Override
+  public BatchSchema getSchema() { return schema; }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    return WritableBatch.get(this);
+  }
+
+  @Override
+  public VectorContainer getOutgoingContainer() { return container; }
+
+  @Override
+  public int getRecordCount() { return container.getRecordCount(); }
+
+  @Override
+  public void kill(boolean sendUpstream) {
+    this.close(); // delete the current spill file
+  }
+
+  /**
+   * Read the next batch from the spill file
+   *
+   * @return IterOutcome
+   */
+  @Override
+  public IterOutcome next() {
+
+    if ( spilledBatches <= 0 ) { // no more batches to read in this partition
+      this.close();
+      return IterOutcome.NONE;
+    }
+
+    if ( spillStream == null ) {
+      throw new IllegalStateException("Spill stream was null");
+    };
+
+    if ( spillSet.getPosition(spillStream)  < 0 ) {
+      HashAggTemplate.logger.warn("Position is {} for stream {}", spillSet.getPosition(spillStream), spillStream.toString());
+    }
+
+    try {
+      if ( container.getNumberOfColumns() > 0 ) { // container already initialized
+        // Pass our container to the reader because other classes (e.g. HashAggBatch, HashTable)
+        // may have a reference to this container (as an "incoming")
+        vas.readFromStreamWithContainer(container, spillStream);
+      }
+      else { // first time - create a container
+        vas.readFromStream(spillStream);
+        container = vas.get();
+      }
+    } catch (IOException e) {
+      throw UserException.dataReadError(e).addContext("Failed reading from a spill file").build(HashAggTemplate.logger);
+    }
+
+    spilledBatches-- ; // one less batch to read
+    return IterOutcome.OK;
+  }
+
+  @Override
+  public void close() {
+    container.clear();
+    try {
+      if (spillStream != null) {
+        spillStream.close();
+        spillStream = null;
+      }
+
+      spillSet.delete(spillFile);
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      spillSet.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 77ebb0d..436480e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -114,7 +114,7 @@ public class ChainedHashTable {
   private HashTableConfig htConfig;
   private final FragmentContext context;
   private final BufferAllocator allocator;
-  private final RecordBatch incomingBuild;
+  private RecordBatch incomingBuild;
   private final RecordBatch incomingProbe;
   private final RecordBatch outgoing;
 
@@ -129,14 +129,18 @@ public class ChainedHashTable {
     this.outgoing = outgoing;
   }
 
-  public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws ClassTransformationException,
+  public void updateIncoming(RecordBatch incomingBuild) {
+    this.incomingBuild = incomingBuild;
+  }
+
+  public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds, int numPartitions) throws ClassTransformationException,
       IOException, SchemaChangeException {
     CodeGenerator<HashTable> top = CodeGenerator.get(HashTable.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
     top.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
     // This code is called from generated code, so to step into this code,
     // persist the code generated in HashAggBatch also.
-//  top.saveCodeForDebugging(true);
+    // top.saveCodeForDebugging(true);
     ClassGenerator<HashTable> cg = top.getRoot();
     ClassGenerator<HashTable> cgInner = cg.getInnerGenerator("BatchHolder");
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index ef7dadf..9c93c16 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.common;
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
@@ -43,7 +44,7 @@ public interface HashTable {
    */
   static final public float DEFAULT_LOAD_FACTOR = 0.75f;
 
-  static public enum PutStatus {KEY_PRESENT, KEY_ADDED, PUT_FAILED;}
+  static public enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;}
 
   /**
    * The batch size used for internal batch holders
@@ -51,30 +52,35 @@ public interface HashTable {
   static final public int BATCH_SIZE = Character.MAX_VALUE + 1;
   static final public int BATCH_MASK = 0x0000FFFF;
 
-  /** Variable width vector size in bytes */
-  public static final int VARIABLE_WIDTH_VECTOR_SIZE = 50 * BATCH_SIZE;
+  public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig);
 
-  public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
-      RecordBatch incomingBuild, RecordBatch incomingProbe,
-      RecordBatch outgoing, VectorContainer htContainerOrig);
+  public void updateBatches() throws SchemaChangeException;
 
-  public void updateBatches();
+  public int getHashCode(int incomingRowIdx) throws SchemaChangeException;
 
-  public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount);
+  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException;
 
-  public int containsKey(int incomingRowIdx, boolean isProbe);
+  public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException;
 
   public void getStats(HashTableStats stats);
 
+  public long extraMemoryNeededForResize();
+
   public int size();
 
   public boolean isEmpty();
 
   public void clear();
 
+  public void reinit(RecordBatch newIncoming);
+
+  public void reset();
+
+  public void setMaxVarcharSize(int size);
+
   public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords);
 
-  public void addNewKeyBatch();
+  // public void addNewKeyBatch();
 }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
index c494c85..7baa9d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
@@ -26,6 +26,13 @@ public class HashTableStats {
 
   public HashTableStats() {
   }
+
+  public void addStats (HashTableStats newStats) {
+    this.numBuckets += newStats.numBuckets ;
+    this.numEntries += newStats.numEntries ;
+    this.numResizing += newStats.numResizing ;
+    this.resizingTime += newStats.resizingTime ;
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 96f9422..3209c27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -25,6 +25,7 @@ import javax.inject.Named;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -50,14 +51,19 @@ public abstract class HashTableTemplate implements HashTable {
   // A hash 'bucket' consists of the start index to indicate start of a hash chain
 
   // Array of start indexes. start index is a global index across all batch holders
+  //  This is the "classic hash table", where  Hash-Value % size-of-table  yields
+  // the offset/position (in the startIndices) of the beginning of the hash chain.
   private IntVector startIndices;
 
   // Array of batch holders..each batch holder can hold up to BATCH_SIZE entries
   private ArrayList<BatchHolder> batchHolders;
 
-  // Size of the hash table in terms of number of buckets
+  // Current size of the hash table in terms of number of buckets
   private int tableSize = 0;
 
+  // Original size of the hash table (needed when re-initializing)
+  private int originalTableSize;
+
   // Threshold after which we rehash; It must be the tableSize * loadFactor
   private int threshold;
 
@@ -95,6 +101,8 @@ public abstract class HashTableTemplate implements HashTable {
 
   private int resizingTime = 0;
 
+  private int maxVarcharSize = 8; // for varchar allocation
+
   // This class encapsulates the links, keys and values for up to BATCH_SIZE
   // *unique* records. Thus, suppose there are N incoming record batches, each
   // of size BATCH_SIZE..but they have M unique keys altogether, the number of
@@ -134,7 +142,9 @@ public abstract class HashTableTemplate implements HashTable {
           if (vv instanceof FixedWidthVector) {
             ((FixedWidthVector) vv).allocateNew(BATCH_SIZE);
           } else if (vv instanceof VariableWidthVector) {
-            ((VariableWidthVector) vv).allocateNew(VARIABLE_WIDTH_VECTOR_SIZE, BATCH_SIZE);
+            long beforeMem = allocator.getAllocatedMemory();
+            ((VariableWidthVector) vv).allocateNew(maxVarcharSize * BATCH_SIZE, BATCH_SIZE);
+            logger.trace("HT allocated {} for varchar of max width {}",allocator.getAllocatedMemory() - beforeMem, maxVarcharSize);
           } else {
             vv.allocateNew();
           }
@@ -166,7 +176,7 @@ public abstract class HashTableTemplate implements HashTable {
       hashValues.getMutator().setValueCount(size);
     }
 
-    protected void setup() {
+    protected void setup() throws SchemaChangeException {
       setupInterior(incomingBuild, incomingProbe, outgoing, htContainer);
     }
 
@@ -175,7 +185,7 @@ public abstract class HashTableTemplate implements HashTable {
     // currentIdxHolder with the index of the next link.
     private boolean isKeyMatch(int incomingRowIdx,
         IndexPointer currentIdxHolder,
-        boolean isProbe) {
+        boolean isProbe) throws SchemaChangeException {
 
       int currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
       boolean match = false;
@@ -201,7 +211,7 @@ public abstract class HashTableTemplate implements HashTable {
 
     // Insert a new <key1, key2...keyN> entry coming from the incoming batch into the hash table
     // container at the specified index
-    private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) {
+    private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) throws SchemaChangeException {
       int currentIdxWithinBatch = currentIdx & BATCH_MASK;
 
       setValue(incomingRowIdx, currentIdxWithinBatch);
@@ -405,36 +415,34 @@ public abstract class HashTableTemplate implements HashTable {
         @Named("incomingBuild") RecordBatch incomingBuild,
         @Named("incomingProbe") RecordBatch incomingProbe,
         @Named("outgoing") RecordBatch outgoing,
-        @Named("htContainer") VectorContainer htContainer) {
+        @Named("htContainer") VectorContainer htContainer) throws SchemaChangeException {
     }
 
     @RuntimeOverridden
     protected boolean isKeyMatchInternalBuild(
-        @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+        @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException {
       return false;
     }
 
     @RuntimeOverridden
     protected boolean isKeyMatchInternalProbe(
-        @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+        @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException {
       return false;
     }
 
     @RuntimeOverridden
-    protected void setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+    protected void setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException {
     }
 
     @RuntimeOverridden
-    protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
+    protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException {
     }
 
   } // class BatchHolder
 
 
   @Override
-  public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
-      RecordBatch incomingBuild, RecordBatch incomingProbe,
-      RecordBatch outgoing, VectorContainer htContainerOrig) {
+  public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
     float loadf = htConfig.getLoadFactor();
     int initialCap = htConfig.getInitialCapacity();
 
@@ -465,6 +473,7 @@ public abstract class HashTableTemplate implements HashTable {
     if (tableSize > MAXIMUM_CAPACITY) {
       tableSize = MAXIMUM_CAPACITY;
     }
+    originalTableSize = tableSize ; // retain original size
 
     threshold = (int) Math.ceil(tableSize * loadf);
 
@@ -476,13 +485,17 @@ public abstract class HashTableTemplate implements HashTable {
     batchHolders = new ArrayList<BatchHolder>();
     // First BatchHolder is created when the first put request is received.
 
-    doSetup(incomingBuild, incomingProbe);
+    try {
+      doSetup(incomingBuild, incomingProbe);
+    } catch (SchemaChangeException e) {
+      throw new IllegalStateException("Unexpected schema change", e);
+    }
 
     currentIdxHolder = new IndexPointer();
   }
 
   @Override
-  public void updateBatches() {
+  public void updateBatches() throws SchemaChangeException {
     doSetup(incomingBuild, incomingProbe);
     for (BatchHolder batchHolder : batchHolders) {
       batchHolder.setup();
@@ -497,6 +510,21 @@ public abstract class HashTableTemplate implements HashTable {
     return numResizing;
   }
 
+  /**
+   *
+   * @return Size of extra memory needed if the HT (i.e. startIndices) is doubled
+   */
+  @Override
+  public long extraMemoryNeededForResize() {
+    if (tableSize == MAXIMUM_CAPACITY) { return 0; } // will not resize
+    int newSize = roundUpToPowerOf2(2 * tableSize);
+
+    if (newSize > MAXIMUM_CAPACITY) {
+      newSize  = MAXIMUM_CAPACITY;
+    }
+    return newSize * 4 /* sizeof(int) */;
+  }
+
   @Override
   public int size() {
     return numEntries;
@@ -526,7 +554,7 @@ public abstract class HashTableTemplate implements HashTable {
       batchHolders = null;
     }
     startIndices.clear();
-    currentIdxHolder = null;
+    // currentIdxHolder = null; // keep IndexPointer in case HT is reused
     numEntries = 0;
   }
 
@@ -544,86 +572,69 @@ public abstract class HashTableTemplate implements HashTable {
     return rounded;
   }
 
-  @Override
-  public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount) {
-    put(incomingRowIdx, htIdxHolder);
+  public int getHashCode(int incomingRowIdx) throws SchemaChangeException {
+    return getHashBuild(incomingRowIdx);
   }
 
-  private PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder) {
+  /** put() uses the hash code (from gethashCode() above) to insert the key(s) from the incoming
+   * row into the hash table. The code selects the bucket in the startIndices, then the keys are
+   * placed into the chained list - by storing the key values into a batch, and updating its
+   * "links" member. Last it modifies the index holder to the batch offset so that the caller
+   * can store the remaining parts of the row into a matching batch (outside the hash table).
+   * Returning
+   *
+   * @param incomingRowIdx - position of the incoming row
+   * @param htIdxHolder - to return batch + batch-offset (for caller to manage a matching batch)
+   * @param hashCode - computed over the key(s) by calling getHashCode()
+   * @return Status - the key(s) was ADDED or was already PRESENT
+   */
+  @Override
+  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException {
 
-    int hash = getHashBuild(incomingRowIdx);
-    int i = getBucketIndex(hash, numBuckets());
-    int startIdx = startIndices.getAccessor().get(i);
+    int bucketIndex = getBucketIndex(hashCode, numBuckets());
+    int startIdx = startIndices.getAccessor().get(bucketIndex);
     int currentIdx;
-    int currentIdxWithinBatch;
-    BatchHolder bh;
     BatchHolder lastEntryBatch = null;
     int lastEntryIdxWithinBatch = EMPTY_SLOT;
 
+    // if startIdx is non-empty, follow the hash chain links until we find a matching
+    // key or reach the end of the chain (and remember the last link there)
+    for ( currentIdxHolder.value = startIdx;
+          currentIdxHolder.value != EMPTY_SLOT;
+          /* isKeyMatch() below also advances the currentIdxHolder to the next link */) {
 
-    if (startIdx == EMPTY_SLOT) {
-      // this is the first entry in this bucket; find the first available slot in the
-      // container of keys and values
-      currentIdx = freeIndex++;
-      addBatchIfNeeded(currentIdx);
+      // remember the current link, which would be the last when the next link is empty
+      lastEntryBatch = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK);
+      lastEntryIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
 
-      if (EXTRA_DEBUG) {
-        logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i,
-            incomingRowIdx, currentIdx);
+      if (lastEntryBatch.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) {
+        htIdxHolder.value = currentIdxHolder.value;
+        return PutStatus.KEY_PRESENT;
       }
-
-      insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch);
-      // update the start index array
-      startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx);
-      htIdxHolder.value = currentIdx;
-      return PutStatus.KEY_ADDED;
     }
 
-    currentIdx = startIdx;
-    boolean found = false;
-
-    bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
-    currentIdxHolder.value = currentIdx;
-
-    // if startIdx is non-empty, follow the hash chain links until we find a matching
-    // key or reach the end of the chain
-    while (true) {
-      currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
+    // no match was found, so insert a new entry
+    currentIdx = freeIndex++;
+    boolean addedBatch = addBatchIfNeeded(currentIdx);
 
-      if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) {
-        htIdxHolder.value = currentIdxHolder.value;
-        found = true;
-        break;
-      } else if (currentIdxHolder.value == EMPTY_SLOT) {
-        lastEntryBatch = bh;
-        lastEntryIdxWithinBatch = currentIdxWithinBatch;
-        break;
-      } else {
-        bh = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK);
-        lastEntryBatch = bh;
-      }
+    if (EXTRA_DEBUG) {
+      logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
     }
 
-    if (!found) {
-      // no match was found, so insert a new entry
-      currentIdx = freeIndex++;
-      addBatchIfNeeded(currentIdx);
+    insertEntry(incomingRowIdx, currentIdx, hashCode, lastEntryBatch, lastEntryIdxWithinBatch);
 
-      if (EXTRA_DEBUG) {
-        logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
-      }
-
-      insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch);
-      htIdxHolder.value = currentIdx;
-      return PutStatus.KEY_ADDED;
+    // if there was no hash chain at this bucket, need to update the start index array
+    if (startIdx == EMPTY_SLOT) {
+      startIndices.getMutator().setSafe(getBucketIndex(hashCode, numBuckets()), currentIdx);
     }
-
-    return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED;
+    htIdxHolder.value = currentIdx;
+    return  addedBatch ? PutStatus.NEW_BATCH_ADDED :
+        ( freeIndex + 1 > batchHolders.size() * BATCH_SIZE ) ?
+        PutStatus.KEY_ADDED_LAST : // the last key in the batch
+        PutStatus.KEY_ADDED;     // otherwise
   }
 
-  private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) {
-
-    addBatchIfNeeded(currentIdx);
+  private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) throws SchemaChangeException {
 
     BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
 
@@ -640,60 +651,39 @@ public abstract class HashTableTemplate implements HashTable {
 
   // Return -1 if key is not found in the hash table. Otherwise, return the global index of the key
   @Override
-  public int containsKey(int incomingRowIdx, boolean isProbe) {
+  public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException {
     int hash = isProbe ? getHashProbe(incomingRowIdx) : getHashBuild(incomingRowIdx);
-    int i = getBucketIndex(hash, numBuckets());
-
-    int currentIdx = startIndices.getAccessor().get(i);
-
-    if (currentIdx == EMPTY_SLOT) {
-      return -1;
-    }
-
-    BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
-    currentIdxHolder.value = currentIdx;
+    int bucketIndex = getBucketIndex(hash, numBuckets());
 
-    boolean found = false;
-
-    while (true) {
+    for ( currentIdxHolder.value = startIndices.getAccessor().get(bucketIndex);
+          currentIdxHolder.value != EMPTY_SLOT; ) {
+      BatchHolder bh = batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK);
       if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, isProbe)) {
-        found = true;
-        break;
-      } else if (currentIdxHolder.value == EMPTY_SLOT) {
-        break;
-      } else {
-        bh = batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK);
+        return currentIdxHolder.value;
       }
     }
-
-    return found ? currentIdxHolder.value : -1;
+    return -1;
   }
 
   // Add a new BatchHolder to the list of batch holders if needed. This is based on the supplied
   // currentIdx; since each BatchHolder can hold up to BATCH_SIZE entries, if the currentIdx exceeds
-  // the capacity, we will add a new BatchHolder.
-  private BatchHolder addBatchIfNeeded(int currentIdx) {
+  // the capacity, we will add a new BatchHolder. Return true if a new batch was added.
+  private boolean addBatchIfNeeded(int currentIdx) throws SchemaChangeException {
     int totalBatchSize = batchHolders.size() * BATCH_SIZE;
 
     if (currentIdx >= totalBatchSize) {
-      BatchHolder bh = addBatchHolder();
+      BatchHolder bh = newBatchHolder(batchHolders.size());
+      batchHolders.add(bh);
+      bh.setup();
       if (EXTRA_DEBUG) {
         logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
       }
-      return bh;
-    } else {
-      return batchHolders.get(batchHolders.size() - 1);
+      return true;
     }
+    return false;
   }
 
-  private BatchHolder addBatchHolder() {
-    BatchHolder bh = newBatchHolder(batchHolders.size());
-    batchHolders.add(bh);
-    bh.setup();
-    return bh;
-  }
-
-  protected BatchHolder newBatchHolder(int index) {
+  protected BatchHolder newBatchHolder(int index) { // special method to allow debugging of gen code
     return new BatchHolder(index);
   }
 
@@ -755,6 +745,34 @@ public abstract class HashTableTemplate implements HashTable {
     numResizing++;
   }
 
+  /**
+   * Reinit the hash table to its original size, and clear up all its prior batch holder
+   *
+   */
+  public void reset() {
+    // long before = allocator.getAllocatedMemory();
+    this.clear(); // Clear all current batch holders and hash table (i.e. free their memory)
+    // long after = allocator.getAllocatedMemory();
+
+    // logger.debug("Reinit Hash Table: Memory before {} After {}  Percent after: {}",before,after, (100 * after ) / before);
+
+    freeIndex = 0; // all batch holders are gone
+    // reallocate batch holders, and the hash table to the original size
+    batchHolders = new ArrayList<BatchHolder>();
+    startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT);
+  }
+  public void reinit(RecordBatch newIncoming) {
+    incomingBuild = newIncoming;
+    reset();
+    try {
+      updateBatches();  // Needed ? (to update the new incoming?)
+    } catch (SchemaChangeException e) {
+      throw new IllegalStateException("Unexpected schema change", e);
+    } catch(IndexOutOfBoundsException ioob) {
+      throw new IllegalStateException("reinit update batches", ioob);
+    }
+  }
+
   @Override
   public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords) {
     assert batchIdx < batchHolders.size();
@@ -775,17 +793,20 @@ public abstract class HashTableTemplate implements HashTable {
   }
 
   @Override
+  public void setMaxVarcharSize(int size) { maxVarcharSize = size; }
+
+/*  @Override
   public void addNewKeyBatch() {
     int numberOfBatches = batchHolders.size();
     this.addBatchHolder();
     freeIndex = numberOfBatches * BATCH_SIZE;
   }
-
+*/
   // These methods will be code-generated in the context of the outer class
-  protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe);
+  protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws SchemaChangeException;
 
-  protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx);
+  protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx) throws SchemaChangeException;
 
-  protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx);
+  protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx) throws SchemaChangeException;
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index e2c016b..4af1664 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -315,7 +315,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     // Create the chained hash table
     final ChainedHashTable ht =
         new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
-    hashTable = ht.createAndSetupHashTable(null);
+    hashTable = ht.createAndSetupHashTable(null, 1);
   }
 
   public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
@@ -374,7 +374,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
 
         // For every record in the build batch , hash the key columns
         for (int i = 0; i < currentRecordCount; i++) {
-          hashTable.put(i, htIndex, 1 /* retry count */);
+          int hashCode = hashTable.getHashCode(i);
+          hashTable.put(i, htIndex, hashCode);
 
                         /* Use the global index returned by the hash table, to store
                          * the current record index and batch index. This will be used

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
index 4cb2bae..a1b8169 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
 
 /**
  * Given a record batch or vector container, determines the actual memory
@@ -68,14 +69,14 @@ public class RecordBatchSizer {
     public int capacity;
     public int density;
     public int dataSize;
+    public boolean variableWidth;
 
-    public ColumnSize(ValueVector v) {
-      metadata = v.getField();
+    public ColumnSize(ValueVector vv) {
+      metadata = vv.getField();
       stdSize = TypeHelper.getSize(metadata.getType());
 
       // Can't get size estimates if this is an empty batch.
-
-      int rowCount = v.getAccessor().getValueCount();
+      int rowCount = vv.getAccessor().getValueCount();
       if (rowCount == 0) {
         estSize = stdSize;
         return;
@@ -84,17 +85,17 @@ public class RecordBatchSizer {
       // Total size taken by all vectors (and underlying buffers)
       // associated with this vector.
 
-      totalSize = v.getAllocatedByteCount();
+      totalSize = vv.getAllocatedByteCount();
 
       // Capacity is the number of values that the vector could
       // contain. This is useful only for fixed-length vectors.
 
-      capacity = v.getValueCapacity();
+      capacity = vv.getValueCapacity();
 
       // The amount of memory consumed by the payload: the actual
       // data stored in the vectors.
 
-      dataSize = v.getPayloadByteCount();
+      dataSize = vv.getPayloadByteCount();
 
       // Determine "density" the number of rows compared to potential
       // capacity. Low-density batches occur at block boundaries, ends
@@ -105,6 +106,7 @@ public class RecordBatchSizer {
 
       density = roundUp(dataSize * 100, totalSize);
       estSize = roundUp(dataSize, rowCount);
+      variableWidth = vv instanceof VariableWidthVector ;
     }
 
     @Override
@@ -155,6 +157,7 @@ public class RecordBatchSizer {
    * vectors are partially full; prevents overestimating row width.
    */
   private int netRowWidth;
+  private int netRowWidthCap50;
   private boolean hasSv2;
   private int sv2Size;
   private int avgDensity;
@@ -167,6 +170,18 @@ public class RecordBatchSizer {
          batch.getSelectionVector2() : null);
   }
 
+  /**
+   *  Maximum width of a column; used for memory estimation in case of Varchars
+   */
+  public int maxSize;
+  /**
+   *  Count the nullable columns; used for memory estimation
+   */
+  public int numNullables;
+  /**
+   *
+   * @param va
+   */
   public RecordBatchSizer(VectorAccessible va) {
     this(va, null);
   }
@@ -174,7 +189,9 @@ public class RecordBatchSizer {
   public RecordBatchSizer(VectorAccessible va, SelectionVector2 sv2) {
     rowCount = va.getRecordCount();
     for (VectorWrapper<?> vw : va) {
-      measureColumn(vw);
+      int size = measureColumn(vw.getValueVector());
+      if ( size > maxSize ) { maxSize = size; }
+      if ( vw.getField().isNullable() ) { numNullables++; }
     }
 
     if (rowCount > 0) {
@@ -208,32 +225,45 @@ public class RecordBatchSizer {
     totalBatchSize += sv2Size;
   }
 
-  private void measureColumn(VectorWrapper<?> vw) {
-    measureColumn(vw.getValueVector());
+  /**
+   *  Round up (if needed) to the next power of 2 (only up to 64)
+   * @param arg Number to round up (must be < 64)
+   * @return power of 2 result
+   */
+  private int roundUpToPowerOf2(int arg) {
+    if ( arg <= 2 ) { return 2; }
+    if ( arg <= 4 ) { return 4; }
+    if ( arg <= 8 ) { return 8; }
+    if ( arg <= 16 ) { return 16; }
+    if ( arg <= 32 ) { return 32; }
+    return 64;
   }
-
-  private void measureColumn(ValueVector v) {
-
+  private int measureColumn(ValueVector vv) {
     // Maps consume no size themselves. However, their contained
     // vectors do consume space, so visit columns recursively.
-
-    if (v.getField().getType().getMinorType() == MinorType.MAP) {
-      expandMap((AbstractMapVector) v);
-      return;
+    if (vv.getField().getType().getMinorType() == MinorType.MAP) {
+      return expandMap((AbstractMapVector) vv);
     }
-    ColumnSize colSize = new ColumnSize(v);
+
+    ColumnSize colSize = new ColumnSize(vv);
     columnSizes.add(colSize);
 
     stdRowWidth += colSize.stdSize;
     totalBatchSize += colSize.totalSize;
     netBatchSize += colSize.dataSize;
     netRowWidth += colSize.estSize;
+    netRowWidthCap50 += ! colSize.variableWidth ? colSize.estSize :
+        8 /* offset vector */ + roundUpToPowerOf2( Math.min(colSize.estSize,50) );
+        // above change 8 to 4 after DRILL-5446 is fixed
+    return colSize.estSize;
   }
 
-  private void expandMap(AbstractMapVector mapVector) {
+  private int expandMap(AbstractMapVector mapVector) {
+    int accum = 0;
     for (ValueVector vector : mapVector) {
-      measureColumn(vector);
+      accum += measureColumn(vector);
     }
+    return accum;
   }
 
   public static int roundUp(int num, int denom) {
@@ -247,10 +277,18 @@ public class RecordBatchSizer {
   public int stdRowWidth() { return stdRowWidth; }
   public int grossRowWidth() { return grossRowWidth; }
   public int netRowWidth() { return netRowWidth; }
+  /**
+   * Compute the "real" width of the row, taking into account each varchar column size
+   * (historically capped at 50, and rounded up to power of 2 to match drill buf allocation)
+   * and null marking columns.
+   * @return "real" width of the row
+   */
+  public int netRowWidthCap50() { return netRowWidthCap50 + numNullables; }
   public int actualSize() { return totalBatchSize; }
   public boolean hasSv2() { return hasSv2; }
   public int avgDensity() { return avgDensity; }
   public int netSize() { return netBatchSize; }
+  public int maxSize() { return maxSize; }
 
   public static final int MAX_VECTOR_SIZE = 16 * 1024 * 1024; // 16 MiB
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
index 74e1fb5..87eebc6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
@@ -30,11 +30,13 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -84,7 +86,7 @@ public class SpillSet {
      * Given a manager-specific input stream, return the current read position.
      * Used to report total read bytes.
      *
-     * @param outputStream input stream created by the file manager
+     * @param inputStream input stream created by the file manager
      * @return
      */
     long getReadBytes(InputStream inputStream);
@@ -346,7 +348,6 @@ public class SpillSet {
    */
 
   private final String spillDirName;
-  private final String spillFileName;
 
   private int fileCount = 0;
 
@@ -356,16 +357,34 @@ public class SpillSet {
 
   private long writeBytes;
 
-  public SpillSet(FragmentContext context, PhysicalOperator popConfig) {
-    this(context, popConfig, null, "spill");
-  }
-
-  public SpillSet(FragmentContext context, PhysicalOperator popConfig,
-                  String opName, String fileName) {
+  public SpillSet(FragmentContext context, PhysicalOperator popConfig, UserBitShared.CoreOperatorType optype) {
     FragmentHandle handle = context.getHandle();
+    String operName = "Unknown";
+
+    // Set the spill options from the configuration
     DrillConfig config = context.getConfig();
-    spillFileName = fileName;
-    List<String> dirList = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
+    String spillFs;
+    List<String> dirList;
+
+    // Set the operator name (used as part of the spill file name),
+    // and set oper. specific options (the config file defaults to using the
+    // common options; users may override those - per operator)
+    switch (optype) {
+      case EXTERNAL_SORT:
+        operName = "Sort";
+        spillFs = config.getString(ExecConstants.EXTERNAL_SORT_SPILL_FILESYSTEM);
+        dirList = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
+        break;
+      case HASH_AGGREGATE:
+        operName = "HashAgg";
+        spillFs = config.getString(ExecConstants.HASHAGG_SPILL_FILESYSTEM);
+        dirList = config.getStringList(ExecConstants.HASHAGG_SPILL_DIRS);
+        break;
+      default: // just use the common ones
+        spillFs = config.getString(ExecConstants.SPILL_FILESYSTEM);
+        dirList = config.getStringList(ExecConstants.SPILL_DIRS);
+    }
+
     dirs = Iterators.cycle(dirList);
 
     // If more than one directory, semi-randomly choose an offset into
@@ -386,23 +405,18 @@ public class SpillSet {
     // system is selected and impersonation is off. (We use that
     // as a proxy for a non-production Drill setup.)
 
-    String spillFs = config.getString(ExecConstants.EXTERNAL_SORT_SPILL_FILESYSTEM);
     boolean impersonationEnabled = config.getBoolean(ExecConstants.IMPERSONATION_ENABLED);
     if (spillFs.startsWith("file:///") && ! impersonationEnabled) {
       fileManager = new LocalFileManager(spillFs);
     } else {
       fileManager = new HadoopFileManager(spillFs);
     }
-    spillDirName = String.format(
-        "%s_major%d_minor%d_op%d%s",
-        QueryIdHelper.getQueryId(handle.getQueryId()),
-        handle.getMajorFragmentId(),
-        handle.getMinorFragmentId(),
-        popConfig.getOperatorId(),
-        (opName == null) ? "" : "_" + opName);
+
+    spillDirName = String.format("%s_%s_%s-%s_minor%s", QueryIdHelper.getQueryId(handle.getQueryId()),
+        operName, handle.getMajorFragmentId(), popConfig.getOperatorId(), handle.getMinorFragmentId());
   }
 
-  public String getNextSpillFile() {
+  public String getNextSpillFile(String extraName) {
 
     // Identify the next directory from the round-robin list to
     // the file created from this round of spilling. The directory must
@@ -411,7 +425,12 @@ public class SpillSet {
     String spillDir = dirs.next();
     String currSpillPath = Joiner.on("/").join(spillDir, spillDirName);
     currSpillDirs.add(currSpillPath);
-    String outputFile = Joiner.on("/").join(currSpillPath, spillFileName + ++fileCount);
+
+    String outputFile = Joiner.on("/").join(currSpillPath, "spill" + ++fileCount);
+    if ( extraName != null ) {
+      outputFile += "_" + extraName;
+    }
+
     try {
         fileManager.deleteOnExit(currSpillPath);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 69e9b4c..4d5f290 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -39,6 +39,8 @@ import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
 import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
 import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
 import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun;
+
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -399,7 +401,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     allocator = oContext.getAllocator();
     opCodeGen = new OperatorCodeGenerator(context, popConfig);
 
-    spillSet = new SpillSet(context, popConfig, "sort", "run");
+    spillSet = new SpillSet(context, popConfig, UserBitShared.CoreOperatorType.EXTERNAL_SORT);
     copierHolder = new CopierHolder(context, allocator, opCodeGen);
     configure(context.getConfig());
   }
@@ -1390,7 +1392,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // spill file. After each write, we release the memory associated
     // with the just-written batch.
 
-    String outputFile = spillSet.getNextSpillFile();
+    String outputFile = spillSet.getNextSpillFile(null);
     stats.setLongStat(Metric.SPILL_COUNT, spillSet.getFileCount());
     BatchGroup.SpilledRun newGroup = null;
     try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index 732ff15..8c69930 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -51,7 +51,7 @@ import java.util.List;
 
 public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel {
 
-  protected static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2};
+  public static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2};
 
   protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1 ; // default phase
   protected List<NamedExpression> keys = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index b911f6b..460ee8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -61,6 +61,9 @@ public abstract class AggPruleBase extends Prule {
   // currently won't generate a 2 phase plan.
   protected boolean create2PhasePlan(RelOptRuleCall call, DrillAggregateRel aggregate) {
     PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+    if ( settings.isForce2phaseAggr() ) { // for testing - force 2 phase aggr
+      return true;
+    }
     RelNode child = call.rel(0).getInputs().get(0);
     boolean smallInput = child.getRows() < settings.getSliceTarget();
     if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode() || smallInput) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index c382af6..09d33fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -73,7 +73,7 @@ public class HashAggPrel extends AggPrelBase implements Prel{
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
 
     Prel child = (Prel) this.getInput();
-    HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator), keys, aggExprs, 1.0f);
+    HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator), operPhase, keys, aggExprs, 1.0f);
 
     return creator.addMetadata(this, g);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 648adb7..15314ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -133,6 +133,9 @@ public class PlannerSettings implements Context{
      the need to turn off join optimization may go away.
    */
   public static final BooleanValidator JOIN_OPTIMIZATION = new BooleanValidator("planner.enable_join_optimization", true);
+  // for testing purpose
+  public static final String FORCE_2PHASE_AGGR_KEY = "planner.force_2phase_aggr";
+  public static final BooleanValidator FORCE_2PHASE_AGGR = new BooleanValidator(FORCE_2PHASE_AGGR_KEY, false);
 
   public OptionManager options = null;
   public FunctionImplementationRegistry functionImplementationRegistry = null;
@@ -274,6 +277,8 @@ public class PlannerSettings implements Context{
     return options.getOption(TYPE_INFERENCE);
   }
 
+  public boolean isForce2phaseAggr() { return options.getOption(FORCE_2PHASE_AGGR);} // for testing
+
   public long getInSubqueryThreshold() {
     return options.getOption(IN_SUBQUERY_THRESHOLD);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 04cf8fc..0daa6b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -34,7 +34,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
  * <p>
  *   A key thing to know is that the Iterator provided by a record batch must
  *   align with the rank positions of the field IDs provided using
- *   {@link getValueVectorId}.
+ *   {@link #getValueVectorId}.
  * </p>
  */
 public interface RecordBatch extends VectorAccessible {

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 8492f36..c2a4d65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -93,6 +93,10 @@ public class SystemOptionManager extends BaseOptionManager implements OptionMana
       PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD,
       PlannerSettings.QUOTING_IDENTIFIERS,
       PlannerSettings.JOIN_OPTIMIZATION,
+      PlannerSettings.FORCE_2PHASE_AGGR, // for testing
+      ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR,
+      ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR,
+      ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR, // for tuning
       ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
index d06424e..79b49e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
@@ -26,7 +26,6 @@ import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.ExternalSort;
 import org.apache.drill.exec.server.options.OptionManager;
 
 public class MemoryAllocationUtilities {
@@ -40,7 +39,7 @@ public class MemoryAllocationUtilities {
    * @param plan
    * @param queryContext
    */
-  public static void setupSortMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
+  public static void setupBufferedOpsMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
 
     // Test plans may already have a pre-defined memory plan.
     // Otherwise, determine memory allocation.
@@ -49,30 +48,30 @@ public class MemoryAllocationUtilities {
       return;
     }
     // look for external sorts
-    final List<ExternalSort> sortList = new LinkedList<>();
+    final List<PhysicalOperator> bufferedOpList = new LinkedList<>();
     for (final PhysicalOperator op : plan.getSortedOperators()) {
-      if (op instanceof ExternalSort) {
-        sortList.add((ExternalSort) op);
+      if ( op.isBufferedOperator() ) {
+        bufferedOpList.add(op);
       }
     }
 
     // if there are any sorts, compute the maximum allocation, and set it on them
-    if (sortList.size() > 0) {
+    if (bufferedOpList.size() > 0) {
       final OptionManager optionManager = queryContext.getOptions();
       final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
       long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
           queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));
       maxAllocPerNode = Math.min(maxAllocPerNode,
           optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
-      final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);
-      logger.debug("Max sort alloc: {}", maxSortAlloc);
+      final long maxOperatorAlloc = maxAllocPerNode / (bufferedOpList.size() * maxWidthPerNode);
+      logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc);
 
-      for(final ExternalSort externalSort : sortList) {
+      for(final PhysicalOperator op : bufferedOpList) {
         // Ensure that the sort receives the minimum memory needed to make progress.
         // Without this, the math might work out to allocate too little memory.
 
-        long alloc = Math.max(maxSortAlloc, externalSort.getInitialAllocation());
-        externalSort.setMaxAllocation(alloc);
+        long alloc = Math.max(maxOperatorAlloc, op.getInitialAllocation());
+        op.setMaxAllocation(alloc);
       }
     }
     plan.getProperties().hasResourcePlan = true;

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 5e5fef0..62c2307 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -433,7 +433,7 @@ public class Foreman implements Runnable {
 
   private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
     validatePlan(plan);
-    MemoryAllocationUtilities.setupSortMemoryAllocations(plan, queryContext);
+    MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
     //Marking endTime of Planning
     queryManager.markPlanningEndTime();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
index 7ffb224..2f945d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
@@ -97,7 +97,7 @@ public class PlanSplitter {
       throw new IllegalStateException("Planning fragments supports only SQL or PHYSICAL QueryType");
     }
 
-    MemoryAllocationUtilities.setupSortMemoryAllocations(plan, queryContext);
+    MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
 
     final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index c2a2bf0..8aedaf6 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -207,6 +207,33 @@ drill.exec: {
     // java ... -ea -Ddrill.exec.debug.validate_vectors=true ...
     validate_vectors: false
   },
+  spill: {
+    // *** Options common to all the operators that may spill
+    // File system to use. Local file system by default.
+    fs: "file:///",
+    // List of directories to use. Directories are created
+    // if they do not exist.
+    directories: [ "/tmp/drill/spill" ]
+  },
+  hashagg: {
+    // An internal tuning; should not be changed
+    min_batches_per_partition: 3,
+    // An option for testing - force a memory limit
+    mem_limit: 0,
+    // The max number of partitions in each hashagg operator
+    // This number is tuned down when memory is limited
+    // Setting it to 1 means: No spilling
+    num_partitions: 32,
+    spill: {
+        // -- The 2 options below can be used to override the common ones
+        // -- (common to all spilling operators)
+        // File system to use. Local file system by default.
+        fs: ${drill.exec.spill.fs},
+        // List of directories to use. Directories are created
+        // if they do not exist.
+        directories:  ${drill.exec.spill.directories},
+    }
+  },
   sort: {
     purge.threshold : 1000,
     external: {
@@ -232,11 +259,15 @@ drill.exec: {
         group.size: 40000,
         // Deprecated for managed xsort; used only by legacy xsort
         threshold: 40000,
+        // -- The two options below can be used to override the options common
+        // -- for all spilling operators (see "spill" above).
+        // -- This is done for backward compatibility; in the future they
+        // -- would be deprecated (you should be using only the common ones)
         // File system to use. Local file system by default.
-        fs: "file:///"
+        fs: ${drill.exec.spill.fs},
         // List of directories to use. Directories are created
         // if they do not exist.
-        directories: [ "/tmp/drill/spill" ],
+        directories:  ${drill.exec.spill.directories},
         // Size of the batches written to, and read from, the spill files.
         // Determines the ratio of memory to input data size for a single-
         // generation sort. Smaller values give larger ratios, but at a

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
index 27df710..1a4d63b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
@@ -211,12 +211,13 @@ public class TestBugFixes extends BaseTestQuery {
     int limit = 65536;
     ImmutableList.Builder<Map<String, Object>> baselineBuilder = ImmutableList.builder();
     for (int i = 0; i < limit; i++) {
-      baselineBuilder.add(Collections.<String, Object>singletonMap("`id`", String.valueOf(i + 1)));
+      baselineBuilder.add(Collections.<String, Object>singletonMap("`id`", /*String.valueOf */ (i + 1)));
     }
     List<Map<String, Object>> baseline = baselineBuilder.build();
 
     testBuilder()
-            .sqlQuery(String.format("select id from dfs_test.`%s/bugs/DRILL-4884/limit_test_parquet/test0_0_0.parquet` group by id limit %s", TEST_RES_PATH, limit))
+            .sqlQuery(String.format("select cast(id as int) as id from dfs_test.`%s/bugs/DRILL-4884/limit_test_parquet/test0_0_0.parquet` group by id order by 1 limit %s",
+                TEST_RES_PATH, limit))
             .unOrdered()
             .baselineRecords(baseline)
             .go();

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
index 66b7571..f15e757 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
@@ -46,7 +46,7 @@ import static org.junit.Assert.assertNull;
  * any particular order of execution. We ignore the results.
  */
 public class TestTpchDistributedConcurrent extends BaseTestQuery {
-  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(140000); // Longer timeout than usual.
+  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(180000); // Longer timeout than usual.
 
   /*
    * Valid test names taken from TestTpchDistributed. Fuller path prefixes are

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
new file mode 100644
index 0000000..fe6fcbc
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.agg;
+
+import ch.qos.logback.classic.Level;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.FixtureBuilder;
+import org.apache.drill.test.LogFixture;
+import org.apache.drill.test.ProfileParser;
+import org.apache.drill.test.QueryBuilder;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *  Test spilling for the Hash Aggr operator (using the mock reader)
+ */
+public class TestHashAggrSpill extends BaseTestQuery {
+
+    private void runAndDump(ClientFixture client, String sql, long expectedRows, long spillCycle, long spilledPartitions) throws Exception {
+        String plan = client.queryBuilder().sql(sql).explainJson();
+
+        QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
+        if ( expectedRows > 0 ) {
+            assertEquals(expectedRows, summary.recordCount());
+        }
+        // System.out.println(String.format("======== \n Results: %,d records, %d batches, %,d ms\n ========", summary.recordCount(), summary.batchCount(), summary.runTimeMs() ) );
+
+        //System.out.println("Query ID: " + summary.queryIdString());
+        ProfileParser profile = client.parseProfile(summary.queryIdString());
+        //profile.print();
+        List<ProfileParser.OperatorProfile> ops = profile.getOpsOfType(UserBitShared.CoreOperatorType.HASH_AGGREGATE_VALUE);
+
+        assertTrue( ! ops.isEmpty() );
+        // check for the first op only
+        ProfileParser.OperatorProfile hag0 = ops.get(0);
+        long opCycle = hag0.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal());
+        assertEquals(spillCycle, opCycle);
+        long op_spilled_partitions = hag0.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal());
+        assertEquals(spilledPartitions, op_spilled_partitions);
+        /* assertEquals(3, ops.size());
+        for ( int i = 0; i < ops.size(); i++ ) {
+            ProfileParser.OperatorProfile hag = ops.get(i);
+            long cycle = hag.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal());
+            long num_partitions = hag.getMetric(HashAggTemplate.Metric.NUM_PARTITIONS.ordinal());
+            long spilled_partitions = hag.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal());
+            long mb_spilled = hag.getMetric(HashAggTemplate.Metric.SPILL_MB.ordinal());
+            System.out.println(String.format("(%d) Spill cycle: %d, num partitions: %d, spilled partitions: %d, MB spilled: %d", i,cycle, num_partitions, spilled_partitions,
+                mb_spilled));
+        } */
+    }
+
+    /**
+     * Test "normal" spilling: Only 2 partitions (out of 4) would require spilling
+     * ("normal spill" means spill-cycle = 1 )
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHashAggrSpill() throws Exception {
+        LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
+            .toConsole()
+            .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
+            ;
+
+        FixtureBuilder builder = ClusterFixture.builder()
+            .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000_000)
+            .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16)
+            .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
+            .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+            // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
+            .maxParallelization(2)
+            .saveProfiles()
+            //.keepLocalFiles()
+            ;
+        try (LogFixture logs = logBuilder.build();
+             ClusterFixture cluster = builder.build();
+             ClientFixture client = cluster.clientFixture()) {
+            String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i";
+            runAndDump(client, sql, 1_200_000, 1, 1);
+        }
+    }
+
+    /**
+     *  Test "secondary" spilling -- Some of the spilled partitions cause more spilling as they are read back
+     *  (Hence spill-cycle = 2 )
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHashAggrSecondaryTertiarySpill() throws Exception {
+        LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
+            .toConsole()
+            .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
+            .logger("org.apache.drill.exec.cache", Level.INFO)
+            ;
+
+        FixtureBuilder builder = ClusterFixture.builder()
+            .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,58_000_000)
+            .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16)
+            .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
+            .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+            .sessionOption(PlannerSettings.STREAMAGG.getOptionName(),false)
+            // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
+            .maxParallelization(1)
+            .saveProfiles()
+            //.keepLocalFiles()
+            ;
+        try (LogFixture logs = logBuilder.build();
+             ClusterFixture cluster = builder.build();
+             ClientFixture client = cluster.clientFixture()) {
+            String sql = "SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K` GROUP BY empid_s44, dept_i, branch_i";
+            runAndDump(client, sql, 1_100_000, 3, 2);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
index e39a644..66588b1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.TopN;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -125,7 +126,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
 
   @Test
   public void testSimpleHashAgg() {
-    HashAggregate aggConf = new HashAggregate(null, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
+    HashAggregate aggConf = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
     List<String> inputJsonBatches = Lists.newArrayList(
         "[{\"a\": 5, \"b\" : 1 }]",
         "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/exec/jdbc/pom.xml b/exec/jdbc/pom.xml
index cb0c517..eecbdfa 100644
--- a/exec/jdbc/pom.xml
+++ b/exec/jdbc/pom.xml
@@ -119,6 +119,7 @@
             <exclude>**/.checkstyle</exclude>
             <exclude>**/.buildpath</exclude>
             <exclude>**/*.json</exclude>
+            <exclude>**/*.iml</exclude>
             <exclude>**/git.properties</exclude>
             <exclude>**/donuts-output-data.txt</exclude>
             <exclude>**/*.tbl</exclude>

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index aa713f8..deed7a7 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -192,8 +192,8 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
           r.pBody, r.dBodies);
       if (RpcConstants.EXTRA_DEBUGGING) {
         logger.debug("Adding message to outbound buffer. {}", outMessage);
+        logger.debug("Sending response with Sender {}", System.identityHashCode(this));
       }
-      logger.debug("Sending response with Sender {}", System.identityHashCode(this));
       connection.getChannel().writeAndFlush(outMessage);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index 105ea47..581a9f8 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -377,6 +377,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached.");
     }
 
+    logger.trace("Reallocating VarChar, new size {}",newAllocationSize);
     final DrillBuf newBuf = allocator.buffer((int)newAllocationSize);
     newBuf.setBytes(0, data, 0, data.capacity());
     data.release();


[2/2] drill git commit: DRILL-5457: Spill implementation for Hash Aggregate

Posted by pr...@apache.org.
DRILL-5457: Spill implementation for Hash Aggregate

closes #822


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c16e5f80
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c16e5f80
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c16e5f80

Branch: refs/heads/master
Commit: c16e5f8072f3e5d18157767143f9ccc7669c4380
Parents: be43a9e
Author: Boaz Ben-Zvi <bo...@BBenZvi-E754-MBP13.local>
Authored: Mon Jun 19 19:04:30 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Tue Jun 20 17:01:01 2017 -0700

----------------------------------------------------------------------
 .../src/resources/drill-override-example.conf   |   22 +
 .../org/apache/drill/exec/ExecConstants.java    |   22 +
 .../cache/VectorAccessibleSerializable.java     |   56 +
 .../drill/exec/physical/base/AbstractBase.java  |   28 +-
 .../exec/physical/base/PhysicalOperator.java    |   15 +
 .../exec/physical/config/ExternalSort.java      |   17 +-
 .../exec/physical/config/HashAggregate.java     |   25 +-
 .../physical/impl/aggregate/HashAggBatch.java   |   46 +-
 .../impl/aggregate/HashAggTemplate.java         | 1113 +++++++++++++++---
 .../physical/impl/aggregate/HashAggregator.java |   19 +-
 .../impl/aggregate/SpilledRecordbatch.java      |  175 +++
 .../physical/impl/common/ChainedHashTable.java  |   10 +-
 .../exec/physical/impl/common/HashTable.java    |   26 +-
 .../physical/impl/common/HashTableStats.java    |    7 +
 .../physical/impl/common/HashTableTemplate.java |  255 ++--
 .../exec/physical/impl/join/HashJoinBatch.java  |    5 +-
 .../physical/impl/spill/RecordBatchSizer.java   |   78 +-
 .../exec/physical/impl/spill/SpillSet.java      |   59 +-
 .../impl/xsort/managed/ExternalSortBatch.java   |    6 +-
 .../exec/planner/physical/AggPrelBase.java      |    2 +-
 .../exec/planner/physical/AggPruleBase.java     |    3 +
 .../exec/planner/physical/HashAggPrel.java      |    2 +-
 .../exec/planner/physical/PlannerSettings.java  |    5 +
 .../apache/drill/exec/record/RecordBatch.java   |    2 +-
 .../server/options/SystemOptionManager.java     |    4 +
 .../exec/util/MemoryAllocationUtilities.java    |   21 +-
 .../apache/drill/exec/work/foreman/Foreman.java |    2 +-
 .../drill/exec/work/user/PlanSplitter.java      |    2 +-
 .../src/main/resources/drill-module.conf        |   35 +-
 .../java/org/apache/drill/TestBugFixes.java     |    5 +-
 .../drill/TestTpchDistributedConcurrent.java    |    2 +-
 .../physical/impl/agg/TestHashAggrSpill.java    |  141 +++
 .../physical/unit/BasicPhysicalOpUnitTest.java  |    3 +-
 exec/jdbc/pom.xml                               |    1 +
 .../java/org/apache/drill/exec/rpc/RpcBus.java  |    2 +-
 .../templates/VariableLengthVectors.java        |    1 +
 36 files changed, 1800 insertions(+), 417 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index b9d09a8..8010f85 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -142,6 +142,13 @@ drill.exec: {
     }
   },
   cache.hazel.subnets: ["*.*.*.*"],
+  spill: {
+     # These options are common to all spilling operators.
+     # They can be overriden, per operator (but this is just for
+     # backward compatibility, and may be deprecated in the future)
+     directories : [ "/tmp/drill/spill" ],
+     fs : "file:///"
+  }
   sort: {
     purge.threshold : 100,
     external: {
@@ -150,11 +157,26 @@ drill.exec: {
         batch.size : 4000,
         group.size : 100,
         threshold : 200,
+        # The 2 options below override the common ones
+        # they should be deprecated in the future
         directories : [ "/tmp/drill/spill" ],
         fs : "file:///"
       }
     }
   },
+  hashagg: {
+    # The partitions divide the work inside the hashagg, to ease
+    # handling spilling. This initial figure is tuned down when
+    # memory is limited.
+    #  Setting this option to 1 disables spilling !
+    num_partitions: 32,
+    spill: {
+        # The 2 options below override the common ones
+        # they should be deprecated in the future
+        directories : [ "/tmp/drill/spill" ],
+        fs : "file:///"
+    }
+  },
   memory: {
     top.max: 1000000000000,
     operator: {

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 18f69d5..537377d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -64,6 +64,12 @@ public interface ExecConstants {
   String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
   String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";
 
+  // Spill boot-time Options common to all spilling operators
+  // (Each individual operator may override the common options)
+
+  String SPILL_FILESYSTEM = "drill.exec.spill.fs";
+  String SPILL_DIRS = "drill.exec.spill.directories";
+
   // External Sort Boot configuration
 
   String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";
@@ -86,6 +92,22 @@ public interface ExecConstants {
 
   BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed", false);
 
+  // Hash Aggregate Options
+
+  String HASHAGG_NUM_PARTITIONS = "drill.exec.hashagg.num_partitions";
+  String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
+  LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128, 32); // 1 means - no spilling
+  String HASHAGG_MAX_MEMORY = "drill.exec.hashagg.mem_limit";
+  String HASHAGG_MAX_MEMORY_KEY = "exec.hashagg.mem_limit";
+  LongValidator HASHAGG_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHAGG_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE, 0);
+  // min batches is used for tuning (each partition needs so many batches when planning the number of partitions,
+  // or reserve this number when calculating whether the remaining available memory is too small and requires a spill.)
+  // Low value may OOM (e.g., when incoming rows become wider), higher values use fewer partitions but are safer
+  String HASHAGG_MIN_BATCHES_PER_PARTITION = "drill.exec.hashagg.min_batches_per_partition";
+  String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "drill.exec.hashagg.min_batches_per_partition";
+  LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5, 3);
+  String HASHAGG_SPILL_DIRS = "drill.exec.hashagg.spill.directories";
+  String HASHAGG_SPILL_FILESYSTEM = "drill.exec.hashagg.spill.fs";
 
   String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
   String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 9d0182f..d569ae5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -35,6 +35,8 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.codahale.metrics.MetricRegistry;
@@ -138,6 +140,60 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
     va = container;
   }
 
+  // Like above, only preserve the original container and list of value-vectors
+  public void readFromStreamWithContainer(VectorContainer myContainer, InputStream input) throws IOException {
+    final VectorContainer container = new VectorContainer();
+    final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
+    recordCount = batchDef.getRecordCount();
+    if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) {
+
+      if (sv2 == null) {
+        sv2 = new SelectionVector2(allocator);
+      }
+      sv2.allocateNew(recordCount * SelectionVector2.RECORD_SIZE);
+      sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE);
+      svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+    }
+    final List<ValueVector> vectorList = Lists.newArrayList();
+    final List<SerializedField> fieldList = batchDef.getFieldList();
+    for (SerializedField metaData : fieldList) {
+      final int dataLength = metaData.getBufferLength();
+      final MaterializedField field = MaterializedField.create(metaData);
+      final DrillBuf buf = allocator.buffer(dataLength);
+      final ValueVector vector;
+      try {
+        buf.writeBytes(input, dataLength);
+        vector = TypeHelper.getNewVector(field, allocator);
+        vector.load(metaData, buf);
+      } finally {
+        buf.release();
+      }
+      vectorList.add(vector);
+    }
+    container.addCollection(vectorList);
+    container.setRecordCount(recordCount);
+    myContainer.transferIn(container); // transfer the vectors
+    myContainer.buildSchema(svMode);
+    myContainer.setRecordCount(recordCount);
+    /*
+    // for debugging -- show values from the first row
+    Object tmp0 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector();
+    Object tmp1 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector();
+    Object tmp2 = (myContainer).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
+    if (tmp0 != null && tmp1 != null && tmp2 != null) {
+      NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
+      NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
+      NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2);
+
+      try {
+        logger.info("HASH AGG: Got a row = {} , {} , {}", vv0.getAccessor().get(0), vv1.getAccessor().get(0), vv2.getAccessor().get(0));
+      } catch (Exception e) { logger.info("HASH AGG: Got an exception = {}",e); }
+    }
+    else { logger.info("HASH AGG: got nulls !!!"); }
+    */
+    va = myContainer;
+  }
+
   public void writeToStreamAndRetain(OutputStream output) throws IOException {
     retain = true;
     writeToStream(output);

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index a547e26..6f42250 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.physical.base;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.graph.GraphVisitor;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
@@ -102,17 +104,31 @@ public abstract class AbstractBase implements PhysicalOperator{
     this.cost = cost;
   }
 
-  // Not available. Presumably because Drill does not currently use
-  // this value, though it does appear in some test physical plans.
-//  public void setMaxAllocation(long alloc) {
-//    maxAllocation = alloc;
-//  }
-
   @Override
   public long getMaxAllocation() {
     return maxAllocation;
   }
 
+  /**
+   * Any operator that supports spilling should override this method
+   * @param maxAllocation The max memory allocation to be set
+   */
+  @Override
+  public void setMaxAllocation(long maxAllocation) {
+    this.maxAllocation = maxAllocation;
+    /*throw new DrillRuntimeException("Unsupported method: setMaxAllocation()");*/
+  }
+
+  /**
+   * Any operator that supports spilling should override this method (and return true)
+   * @return false
+   */
+  @Override @JsonIgnore
+  public boolean isBufferedOperator() { return false; }
+
+  // @Override
+  // public void setBufferedOperator(boolean bo) {}
+
   @Override
   public String getUserName() {
     return userName;

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index b1954ca..980f32c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -83,6 +83,21 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
    */
   public long getMaxAllocation();
 
+  /**
+   *
+   * @param maxAllocation The max memory allocation to be set
+   */
+  public void setMaxAllocation(long maxAllocation);
+
+  /**
+   *
+   * @return True iff this operator manages its memory (including disk spilling)
+   */
+  @JsonIgnore
+  public boolean isBufferedOperator();
+
+  // public void setBufferedOperator(boolean bo);
+
   @JsonProperty("@id")
   public int getOperatorId();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
index 17848d0..cb9679d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
@@ -49,12 +49,19 @@ public class ExternalSort extends Sort {
     return CoreOperatorType.EXTERNAL_SORT_VALUE;
   }
 
-  // Set here, rather than the base class, because this is the only
-  // operator, at present, that makes use of the maximum allocation.
-  // Remove this, in favor of the base class version, when Drill
-  // sets the memory allocation for all operators.
-
+  /**
+   *
+   * @param maxAllocation The max memory allocation to be set
+   */
+  @Override
   public void setMaxAllocation(long maxAllocation) {
     this.maxAllocation = maxAllocation;
   }
+
+  /**
+   * The External Sort operator supports spilling
+   * @return true
+   */
+  @Override
+  public boolean isBufferedOperator() { return true; }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
index 4dafbe8..0614dc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -34,6 +35,7 @@ public class HashAggregate extends AbstractSingle {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregate.class);
 
+  private final AggPrelBase.OperatorPhase aggPhase;
   private final List<NamedExpression> groupByExprs;
   private final List<NamedExpression> aggrExprs;
 
@@ -41,15 +43,19 @@ public class HashAggregate extends AbstractSingle {
 
   @JsonCreator
   public HashAggregate(@JsonProperty("child") PhysicalOperator child,
+                       @JsonProperty("phase") AggPrelBase.OperatorPhase aggPhase,
                        @JsonProperty("keys") List<NamedExpression> groupByExprs,
                        @JsonProperty("exprs") List<NamedExpression> aggrExprs,
                        @JsonProperty("cardinality") float cardinality) {
     super(child);
+    this.aggPhase = aggPhase;
     this.groupByExprs = groupByExprs;
     this.aggrExprs = aggrExprs;
     this.cardinality = cardinality;
   }
 
+  public AggPrelBase.OperatorPhase getAggPhase() { return aggPhase; }
+
   public List<NamedExpression> getGroupByExprs() {
     return groupByExprs;
   }
@@ -69,7 +75,9 @@ public class HashAggregate extends AbstractSingle {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new HashAggregate(child, groupByExprs, aggrExprs, cardinality);
+    HashAggregate newHAG = new HashAggregate(child, aggPhase, groupByExprs, aggrExprs, cardinality);
+    newHAG.setMaxAllocation(getMaxAllocation());
+    return newHAG;
   }
 
   @Override
@@ -77,5 +85,18 @@ public class HashAggregate extends AbstractSingle {
     return CoreOperatorType.HASH_AGGREGATE_VALUE;
   }
 
-
+  /**
+   *
+   * @param maxAllocation The max memory allocation to be set
+   */
+  @Override
+  public void setMaxAllocation(long maxAllocation) {
+    this.maxAllocation = maxAllocation;
+  }
+  /**
+   * The Hash Aggregate operator supports spilling
+   * @return true
+   */
+  @Override
+  public boolean isBufferedOperator() { return true; }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index dc913b1..97e0599 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 import java.io.IOException;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
@@ -55,7 +56,6 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.google.common.collect.Lists;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JVar;
 
@@ -63,12 +63,13 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatch.class);
 
   private HashAggregator aggregator;
-  private final RecordBatch incoming;
+  private RecordBatch incoming;
   private LogicalExpression[] aggrExprs;
   private TypedFieldId[] groupByOutFieldIds;
   private TypedFieldId[] aggrOutFieldIds;      // field ids for the outgoing batch
   private final List<Comparator> comparators;
   private BatchSchema incomingSchema;
+  private boolean wasKilled;
 
   private final GeneratorMapping UPDATE_AGGR_INSIDE =
       GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */,
@@ -87,6 +88,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException {
     super(popConfig, context);
     this.incoming = incoming;
+    wasKilled = false;
 
     final int numGrpByExprs = popConfig.getGroupByExprs().size();
     comparators = Lists.newArrayListWithExpectedSize(numGrpByExprs);
@@ -136,15 +138,36 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       return IterOutcome.NONE;
     }
 
-    if (aggregator.buildComplete() && !aggregator.allFlushed()) {
-      // aggregation is complete and not all records have been output yet
-      return aggregator.outputCurrentBatch();
+    // if aggregation is complete and not all records have been output yet
+    if (aggregator.buildComplete() ||
+        // or: 1st phase need to return (not fully grouped) partial output due to memory pressure
+        aggregator.earlyOutput()) {
+      // then output the next batch downstream
+      HashAggregator.AggIterOutcome aggOut = aggregator.outputCurrentBatch();
+      // if Batch returned, or end of data - then return the appropriate iter outcome
+      if ( aggOut == HashAggregator.AggIterOutcome.AGG_NONE ) { return IterOutcome.NONE; }
+      if ( aggOut == HashAggregator.AggIterOutcome.AGG_OK ) { return IterOutcome.OK; }
+      // if RESTART - continue below with doWork() - read some spilled partition, just like reading incoming
+      incoming = aggregator.getNewIncoming(); // Restart - incoming was just changed
+    }
+
+    if (wasKilled) { // if kill() was called before, then finish up
+      aggregator.cleanup();
+      incoming.kill(false);
+      return IterOutcome.NONE;
     }
 
-    logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
+    // Read and aggregate records
+    // ( may need to run again if the spilled partition that was read
+    //   generated new partitions that were all spilled )
+    AggOutcome out;
+    do {
+      //
+      //  Read incoming batches and process their records
+      //
+      out = aggregator.doWork();
+    } while (out == AggOutcome.CALL_WORK_AGAIN);
 
-    AggOutcome out = aggregator.doWork();
-    logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
     switch (out) {
     case CLEANUP_AND_RETURN:
       container.zeroVectors();
@@ -153,6 +176,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       // fall through
     case RETURN_OUTCOME:
       return aggregator.getOutcome();
+
     case UPDATE_AGGREGATOR:
       context.fail(UserException.unsupportedError()
           .message(SchemaChangeException.schemaChanged(
@@ -175,7 +199,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
    * @return true if the aggregator was setup successfully. false if there was a failure.
    */
   private boolean createAggregator() {
-    logger.debug("Creating new aggregator.");
     try {
       stats.startSetup();
       this.aggregator = createAggregatorInternal();
@@ -198,7 +221,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder");
     top.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-//    top.saveCodeForDebugging(true);
+    // top.saveCodeForDebugging(true);
 
     container.clear();
 
@@ -266,7 +289,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
             HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */, comparators);
 
     agg.setup(popConfig, htConfig, context, this.stats,
-        oContext.getAllocator(), incoming, this,
+        oContext, incoming, this,
         aggrExprs,
         cgInner.getWorkspaceTypes(),
         groupByOutFieldIds,
@@ -314,6 +337,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
   @Override
   protected void killIncoming(boolean sendUpstream) {
+    wasKilled = true;
     incoming.kill(sendUpstream);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 1615200..38f0222 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -18,82 +18,155 @@
 package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import javax.inject.Named;
 
+import com.google.common.base.Stopwatch;
+
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
 import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
+
+import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.common.IndexPointer;
+
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
+
+import org.apache.drill.exec.proto.UserBitShared;
+
 import org.apache.drill.exec.record.MaterializedField;
+
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.BatchSchema;
+
 import org.apache.drill.exec.record.VectorContainer;
+
+import org.apache.drill.exec.record.TypedFieldId;
+
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+
 import org.apache.drill.exec.vector.AllocationHelper;
+
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ObjectVector;
 import org.apache.drill.exec.vector.ValueVector;
+
 import org.apache.drill.exec.vector.VariableWidthVector;
 
+import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_SIZE;
+
 public abstract class HashAggTemplate implements HashAggregator {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
+  protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
 
-//  private static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
-//  private static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
-  private static final int VARIABLE_WIDTH_VALUE_SIZE = 50;
+  private static final int VARIABLE_MAX_WIDTH_VALUE_SIZE = 50;
+  private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
 
   private static final boolean EXTRA_DEBUG_1 = false;
   private static final boolean EXTRA_DEBUG_2 = false;
-//  private static final String TOO_BIG_ERROR =
-//      "Couldn't add value to an empty batch.  This likely means that a single value is too long for a varlen field.";
-//  private boolean newSchema = false;
+  private static final boolean EXTRA_DEBUG_SPILL = false;
+
+  // Fields needed for partitioning (the groups into partitions)
+  private int numPartitions = 0; // must be 2 to the power of bitsInMask (set in setup())
+  private int partitionMask; // numPartitions - 1
+  private int bitsInMask; // number of bits in the MASK
+  private int nextPartitionToReturn = 0; // which partition to return the next batch from
+  // The following members are used for logging, metrics, etc.
+  private int rowsInPartition = 0; // counts #rows in each partition
+  private int rowsNotSpilled = 0;
+  private int rowsSpilled = 0;
+  private int rowsSpilledReturned = 0;
+  private int rowsReturnedEarly = 0;
+
+  private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
+  private boolean is2ndPhase = false;
+  private boolean canSpill = true; // make it false in case can not spill
+  private ChainedHashTable baseHashTable;
+  private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory
+  private int earlyPartition = 0; // which partition to return early
+
+  private long memoryLimit; // max memory to be used by this oerator
+  private long estMaxBatchSize = 0; // used for adjusting #partitions
+  private long estRowWidth = 0;
+  private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
+  private long minBatchesPerPartition; // for tuning - num partitions and spill decision
+  private long plannedBatches = 0; // account for planned, but not yet allocated batches
+
   private int underlyingIndex = 0;
   private int currentIndex = 0;
   private IterOutcome outcome;
-//  private int outputCount = 0;
   private int numGroupedRecords = 0;
-  private int outBatchIndex = 0;
+  private int currentBatchRecordCount = 0; // Performance: Avoid repeated calls to getRecordCount()
+
   private int lastBatchOutputCount = 0;
   private RecordBatch incoming;
-//  private BatchSchema schema;
+  private BatchSchema schema;
   private HashAggBatch outgoing;
   private VectorContainer outContainer;
-//  private FragmentContext context;
+
+  private FragmentContext context;
+  private OperatorContext oContext;
   private BufferAllocator allocator;
 
-//  private HashAggregate hashAggrConfig;
-  private HashTable htable;
-  private ArrayList<BatchHolder> batchHolders;
+  private HashTable htables[];
+  private ArrayList<BatchHolder> batchHolders[];
+  private int outBatchIndex[];
+
+  // For handling spilling
+  private SpillSet spillSet;
+  SpilledRecordbatch newIncoming; // when reading a spilled file - work like an "incoming"
+  private OutputStream outputStream[]; // an output stream for each spilled partition
+  private int spilledBatchesCount[]; // count number of batches spilled, in each partition
+  private String spillFiles[];
+  private int cycleNum = 0; // primary, secondary, tertiary, etc.
+  private int originalPartition = -1; // the partition a secondary reads from
+
+  private static class SpilledPartition { public int spilledBatches; public String spillFile; int cycleNum; int origPartn; int prevOrigPartn; }
+
+  private ArrayList<SpilledPartition> spilledPartitionsList;
+  private int operatorId; // for the spill file name
+
   private IndexPointer htIdxHolder; // holder for the Hashtable's internal index returned by put()
   private IndexPointer outStartIdxHolder;
   private IndexPointer outNumRecordsHolder;
   private int numGroupByOutFields = 0; // Note: this should be <= number of group-by fields
-
-  ErrorCollector collector = new ErrorCollectorImpl();
+  private TypedFieldId[] groupByOutFieldIds;
 
   private MaterializedField[] materializedValueFields;
   private boolean allFlushed = false;
   private boolean buildComplete = false;
+  private boolean handlingSpills = false; // True once starting to process spill files
 
   private OperatorStats stats = null;
   private HashTableStats htStats = new HashTableStats();
@@ -103,7 +176,15 @@ public abstract class HashAggTemplate implements HashAggregator {
     NUM_BUCKETS,
     NUM_ENTRIES,
     NUM_RESIZING,
-    RESIZING_TIME;
+    RESIZING_TIME,
+    NUM_PARTITIONS,
+    SPILLED_PARTITIONS, // number of partitions spilled to disk
+    SPILL_MB,         // Number of MB of data spilled to disk. This amount is first written,
+                      // then later re-read. So, disk I/O is twice this amount.
+                      // For first phase aggr -- this is an estimate of the amount of data
+                      // returned early (analogous to a spill in the 2nd phase).
+    SPILL_CYCLE       // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
+    ;
 
     // duplicate for hash ag
 
@@ -121,7 +202,6 @@ public abstract class HashAggTemplate implements HashAggregator {
     private int batchOutputCount = 0;
 
     private int capacity = Integer.MAX_VALUE;
-    private boolean allocatedNextBatch = false;
 
     @SuppressWarnings("resource")
     public BatchHolder() {
@@ -145,8 +225,8 @@ public abstract class HashAggTemplate implements HashAggregator {
           if (vector instanceof FixedWidthVector) {
             ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE);
           } else if (vector instanceof VariableWidthVector) {
-            ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE,
-                HashTable.BATCH_SIZE);
+            // This case is never used .... a varchar falls under ObjectVector which is allocated on the heap !
+            ((VariableWidthVector) vector).allocateNew(maxColumnWidth, HashTable.BATCH_SIZE);
           } else if (vector instanceof ObjectVector) {
             ((ObjectVector) vector).allocateNew(HashTable.BATCH_SIZE);
           } else {
@@ -166,20 +246,23 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
 
     private boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch) {
-      updateAggrValuesInternal(incomingRowIdx, idxWithinBatch);
+      try { updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); }
+      catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc); }
       maxOccupiedIdx = Math.max(maxOccupiedIdx, idxWithinBatch);
       return true;
     }
 
     private void setup() {
-      setupInterior(incoming, outgoing, aggrValuesContainer);
+      try { setupInterior(incoming, outgoing, aggrValuesContainer); }
+      catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
     }
 
     private void outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) {
       outStartIdxHolder.value = batchOutputCount;
       outNumRecordsHolder.value = 0;
       for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
-        outputRecordValues(i, batchOutputCount);
+        try { outputRecordValues(i, batchOutputCount); }
+        catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
         if (EXTRA_DEBUG_2) {
           logger.debug("Outputting values to output index: {}", batchOutputCount);
         }
@@ -204,24 +287,23 @@ public abstract class HashAggTemplate implements HashAggregator {
 
     @RuntimeOverridden
     public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing,
-        @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) {
+        @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException {
     }
 
     @RuntimeOverridden
-    public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+    public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{
     }
 
     @RuntimeOverridden
-    public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
+    public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{
     }
   }
 
-
   @Override
   public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
-      OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing,
-      LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds,
-      VectorContainer outContainer) throws SchemaChangeException, ClassTransformationException, IOException {
+                    OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
+                    LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds,
+                    VectorContainer outContainer) throws SchemaChangeException, IOException {
 
     if (valueExprs == null || valueFieldIds == null) {
       throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
@@ -230,15 +312,34 @@ public abstract class HashAggTemplate implements HashAggregator {
       throw new IllegalArgumentException("Wrong number of workspace variables.");
     }
 
-//    this.context = context;
+    this.context = context;
     this.stats = stats;
-    this.allocator = allocator;
+    this.allocator = oContext.getAllocator();
+    this.oContext = oContext;
     this.incoming = incoming;
-//    this.schema = incoming.getSchema();
     this.outgoing = outgoing;
     this.outContainer = outContainer;
+    this.operatorId = hashAggrConfig.getOperatorId();
+
+    is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2;
+    isTwoPhase = hashAggrConfig.getAggPhase() != AggPrelBase.OperatorPhase.PHASE_1of1;
+    canSpill = isTwoPhase; // single phase can not spill
+
+    // Typically for testing - force a spill after a partition has more than so many batches
+    minBatchesPerPartition = context.getConfig().getLong(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION);
+
+    // Set the memory limit
+    memoryLimit = allocator.getLimit();
+    // Optional configured memory limit, typically used only for testing.
+    long configLimit = context.getConfig().getLong(ExecConstants.HASHAGG_MAX_MEMORY);
+    if (configLimit > 0) {
+      logger.warn("Memory limit was changed to {}",configLimit);
+      memoryLimit = Math.min(memoryLimit, configLimit);
+      allocator.setLimit(memoryLimit); // enforce at the allocator
+    }
 
-//    this.hashAggrConfig = hashAggrConfig;
+    // All the settings that require the number of partitions were moved into delayedSetup()
+    // which would be called later, after the actuall data first arrives
 
     // currently, hash aggregation is only applicable if there are group-by expressions.
     // For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no
@@ -266,112 +367,278 @@ public abstract class HashAggTemplate implements HashAggregator {
       }
     }
 
-    ChainedHashTable ht =
+    spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE);
+    baseHashTable =
         new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing);
-    this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
-
+    this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill)
     numGroupByOutFields = groupByOutFieldIds.length;
-    batchHolders = new ArrayList<BatchHolder>();
-    // First BatchHolder is created when the first put request is received.
 
     doSetup(incoming);
   }
 
+  /**
+   *  Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming
+   *  This data is used to compute the number of partitions.
+   */
+  private void delayedSetup() {
+
+    // Set the number of partitions from the configuration (raise to a power of two, if needed)
+    numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS);
+    if ( numPartitions == 1 ) {
+      canSpill = false;
+      logger.warn("Spilling was disabled due to configuration setting of num_partitions to 1");
+    }
+    numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
+
+    if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch
+    else {
+      // Estimate the max batch size; should use actual data (e.g. lengths of varchars)
+      updateEstMaxBatchSize(incoming);
+    }
+    long memAvail = memoryLimit - allocator.getAllocatedMemory();
+    if ( !canSpill ) { // single phase, or spill disabled by configuation
+      numPartitions = 1; // single phase should use only a single partition (to save memory)
+    } else { // two phase
+      // Adjust down the number of partitions if needed - when the memory available can not hold as
+      // many batches (configurable option), plus overhead (e.g. hash table, links, hash values))
+      while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) {
+        numPartitions /= 2;
+        if ( numPartitions < 2) {
+          if ( is2ndPhase ) {
+            canSpill = false;  // 2nd phase needs at least 2 to make progress
+            logger.warn("Spilling was disabled - not enough memory available for internal partitioning");
+          }
+          break;
+        }
+      }
+    }
+    logger.debug("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
+        numPartitions, canSpill ? "Can" : "Cannot");
+
+    // The following initial safety check should be revisited once we can lower the number of rows in a batch
+    // In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table)
+    if ( numPartitions == 1 ) {
+      // if too little memory - behave like the old code -- no memory limit for hash aggregate
+      allocator.setLimit(AbstractBase.MAX_ALLOCATION);  // 10_000_000_000L
+    }
+    // Based on the number of partitions: Set the mask and bit count
+    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
+    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+    // Create arrays (one entry per partition)
+    htables = new HashTable[numPartitions] ;
+    batchHolders = (ArrayList<BatchHolder>[]) new ArrayList<?>[numPartitions] ;
+    outBatchIndex = new int[numPartitions] ;
+    outputStream = new OutputStream[numPartitions];
+    spilledBatchesCount = new int[numPartitions];
+    spillFiles = new String[numPartitions];
+    spilledPartitionsList = new ArrayList<SpilledPartition>();
+
+    plannedBatches = numPartitions; // each partition should allocate its first batch
+
+    // initialize every (per partition) entry in the arrays
+    for (int i = 0; i < numPartitions; i++ ) {
+      try {
+        this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
+        this.htables[i].setMaxVarcharSize(maxColumnWidth);
+      } catch (ClassTransformationException e) {
+        throw UserException.unsupportedError(e)
+            .message("Code generation error - likely an error in the code.")
+            .build(logger);
+      } catch (IOException e) {
+        throw UserException.resourceError(e)
+            .message("IO Error while creating a hash table.")
+            .build(logger);
+      } catch (SchemaChangeException sce) {
+        throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce);
+      }
+      this.batchHolders[i] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
+    }
+  }
+  /**
+   * get new incoming: (when reading spilled files like an "incoming")
+   * @return The (newly replaced) incoming
+   */
+  @Override
+  public RecordBatch getNewIncoming() { return newIncoming; }
+
+  private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeException, IOException {
+    baseHashTable.updateIncoming(newIncoming); // after a spill - a new incoming
+    this.incoming = newIncoming;
+    currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in this spill file
+    nextPartitionToReturn = 0;
+    for (int i = 0; i < numPartitions; i++ ) {
+      htables[i].reinit(newIncoming);
+      if ( batchHolders[i] != null) {
+        for (BatchHolder bh : batchHolders[i]) {
+          bh.clear();
+        }
+        batchHolders[i].clear();
+        batchHolders[i] = new ArrayList<BatchHolder>();
+      }
+      outBatchIndex[i] = 0;
+      outputStream[i] = null;
+      spilledBatchesCount[i] = 0;
+      spillFiles[i] = null;
+    }
+  }
+
+  /**
+   *  Update the estimated max batch size to be used in the Hash Aggr Op.
+   *  using the record batch size to get the row width.
+   * @param incoming
+   */
+  private void updateEstMaxBatchSize(RecordBatch incoming) {
+    if ( estMaxBatchSize > 0 ) { return; }  // no handling of a schema (or varchar) change
+    RecordBatchSizer sizer = new RecordBatchSizer(incoming);
+    logger.trace("Incoming sizer: {}",sizer);
+    // An empty batch only has the schema, can not tell actual length of varchars
+    // else use the actual varchars length, each capped at 50 (to match the space allocation)
+    estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
+    estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
+
+    // Get approx max (varchar) column width to get better memory allocation
+    maxColumnWidth = Math.max(sizer.maxSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
+    maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE);
+
+    logger.trace("{} phase. Estimated row width: {}  batch size: {}  memory limit: {}  max column width: {}",
+        isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
+
+    if ( estMaxBatchSize > memoryLimit ) {
+      logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,memoryLimit);
+    }
+  }
+
+  /**
+   *  Read and process (i.e., insert into the hash table and aggregate) records from the current batch.
+   *  Once complete, get the incoming NEXT batch and process it as well, etc.
+   *  For 1st phase, may return when an early output needs to be performed.
+   *
+   * @return Agg outcome status
+   */
   @Override
   public AggOutcome doWork() {
-    try {
-      // Note: Keeping the outer and inner try blocks here to maintain some similarity with
-      // StreamingAggregate which does somethings conditionally in the outer try block.
-      // In the future HashAggregate may also need to perform some actions conditionally
-      // in the outer try block.
-
-      outside:
-      while (true) {
-        // loop through existing records, aggregating the values as necessary.
-        if (EXTRA_DEBUG_1) {
-          logger.debug("Starting outer loop of doWork()...");
+
+    while (true) {
+
+      // This would be called only once - first time actual data arrives on incoming
+      if ( schema == null && incoming.getRecordCount() > 0 ) {
+        this.schema = incoming.getSchema();
+        currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch
+        // Calculate the number of partitions based on actual incoming data
+        delayedSetup();
+      }
+
+      //
+      //  loop through existing records in this batch, aggregating the values as necessary.
+      //
+      if (EXTRA_DEBUG_1) {
+        logger.debug("Starting outer loop of doWork()...");
+      }
+      for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+        if (EXTRA_DEBUG_2) {
+          logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
         }
-        for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-          if (EXTRA_DEBUG_2) {
-            logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
-          }
-          checkGroupAndAggrValues(currentIndex);
+        checkGroupAndAggrValues(currentIndex);
+        // If adding a group discovered a memory pressure during 1st phase, then start
+        // outputing some partition downstream in order to free memory.
+        if ( earlyOutput ) {
+          outputCurrentBatch();
+          incIndex(); // next time continue with the next incoming row
+          return AggOutcome.RETURN_OUTCOME;
         }
+      }
+
+      if (EXTRA_DEBUG_1) {
+        logger.debug("Processed {} records", underlyingIndex);
+      }
 
-        if (EXTRA_DEBUG_1) {
-          logger.debug("Processed {} records", underlyingIndex);
+      // Cleanup the previous batch since we are done processing it.
+      for (VectorWrapper<?> v : incoming) {
+        v.getValueVector().clear();
+      }
+      //
+      // Get the NEXT input batch, initially from the upstream, later (if there was a spill)
+      // from one of the spill files (The spill case is handled differently here to avoid
+      // collecting stats on the spilled records)
+      //
+      if ( handlingSpills ) {
+        outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP;
+      } else {
+        long beforeAlloc = allocator.getAllocatedMemory();
+
+        // Get the next RecordBatch from the incoming (i.e. upstream operator)
+        outcome = outgoing.next(0, incoming);
+
+        // If incoming batch is bigger than our estimate - adjust the estimate to match
+        long afterAlloc = allocator.getAllocatedMemory();
+        long incomingBatchSize = afterAlloc - beforeAlloc;
+        if ( estMaxBatchSize < incomingBatchSize) {
+          logger.trace("Found a bigger incoming batch: {} , prior estimate was: {}", incomingBatchSize, estMaxBatchSize);
+          estMaxBatchSize = incomingBatchSize;
         }
+      }
 
-        try {
+      if (EXTRA_DEBUG_1) {
+        logger.debug("Received IterOutcome of {}", outcome);
+      }
 
-          while (true) {
-            // Cleanup the previous batch since we are done processing it.
-            for (VectorWrapper<?> v : incoming) {
-              v.getValueVector().clear();
-            }
-            IterOutcome out = outgoing.next(0, incoming);
-            if (EXTRA_DEBUG_1) {
-              logger.debug("Received IterOutcome of {}", out);
-            }
-            switch (out) {
-              case OUT_OF_MEMORY:
-              case NOT_YET:
-                this.outcome = out;
-                return AggOutcome.RETURN_OUTCOME;
-
-              case OK_NEW_SCHEMA:
-                if (EXTRA_DEBUG_1) {
-                  logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
-                }
-//                newSchema = true;
-                this.cleanup();
-                // TODO: new schema case needs to be handled appropriately
-                return AggOutcome.UPDATE_AGGREGATOR;
-
-              case OK:
-                resetIndex();
-                if (incoming.getRecordCount() == 0) {
-                  continue;
-                } else {
-                  checkGroupAndAggrValues(currentIndex);
-                  incIndex();
-
-                  if (EXTRA_DEBUG_1) {
-                    logger.debug("Continuing outside loop");
-                  }
-                  continue outside;
-                }
-
-              case NONE:
-                // outcome = out;
-
-                buildComplete = true;
-
-                updateStats(htable);
-
-                // output the first batch; remaining batches will be output
-                // in response to each next() call by a downstream operator
-
-                outputCurrentBatch();
-
-                // return setOkAndReturn();
-                return AggOutcome.RETURN_OUTCOME;
-
-              case STOP:
-              default:
-                outcome = out;
-                return AggOutcome.CLEANUP_AND_RETURN;
-            }
+      // Handle various results from getting the next batch
+      switch (outcome) {
+        case OUT_OF_MEMORY:
+        case NOT_YET:
+          return AggOutcome.RETURN_OUTCOME;
+
+        case OK_NEW_SCHEMA:
+          if (EXTRA_DEBUG_1) {
+            logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
           }
+          this.cleanup();
+          // TODO: new schema case needs to be handled appropriately
+          return AggOutcome.UPDATE_AGGREGATOR;
 
-        } finally {
-          // placeholder...
-        }
+        case OK:
+          currentBatchRecordCount = incoming.getRecordCount(); // size of next batch
+
+          resetIndex(); // initialize index (a new batch needs to be processed)
+
+          if (EXTRA_DEBUG_1) {
+            logger.debug("Continue to start processing the next batch");
+          }
+          break;
+
+        case NONE:
+          resetIndex(); // initialize index (in case spill files need to be processed)
+
+          buildComplete = true;
+
+          updateStats(htables);
+
+          // output the first batch; remaining batches will be output
+          // in response to each next() call by a downstream operator
+          AggIterOutcome aggOutcome = outputCurrentBatch();
+
+          if ( aggOutcome == AggIterOutcome.AGG_RESTART ) {
+            // Output of first batch returned a RESTART (all new partitions were spilled)
+            return AggOutcome.CALL_WORK_AGAIN; // need to read/process the next partition
+          }
+
+          if ( aggOutcome != AggIterOutcome.AGG_NONE ) { outcome = IterOutcome.OK; }
+
+          return AggOutcome.RETURN_OUTCOME;
+
+        case STOP:
+        default:
+          return AggOutcome.CLEANUP_AND_RETURN;
       }
-    } finally {
     }
   }
 
+  /**
+   *   Allocate space for the returned aggregate columns
+   *   (Note DRILL-5588: Maybe can eliminate this allocation (and copy))
+   * @param records
+   */
   private void allocateOutgoing(int records) {
     // Skip the keys and only allocate for outputting the workspace values
     // (keys will be output through splitAndTransfer)
@@ -382,14 +649,8 @@ public abstract class HashAggTemplate implements HashAggregator {
     while (outgoingIter.hasNext()) {
       @SuppressWarnings("resource")
       ValueVector vv = outgoingIter.next().getValueVector();
-//      MajorType type = vv.getField().getType();
 
-      /*
-       * In build schema we use the allocation model that specifies exact record count
-       * so we need to stick with that allocation model until DRILL-2211 is resolved. Using
-       * 50 as the average bytes per value as is used in HashTable.
-       */
-      AllocationHelper.allocatePrecomputedChildCount(vv, records, VARIABLE_WIDTH_VALUE_SIZE, 0);
+      AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0);
     }
   }
 
@@ -400,45 +661,82 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   @Override
   public int getOutputCount() {
-    // return outputCount;
     return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-    if (htable != null) {
-      htable.clear();
-      htable = null;
+    if ( schema == null ) { return; } // not set up; nothing to clean
+    if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
+      stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
+          (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
+    }
+    // clean (and deallocate) each partition
+    for ( int i = 0; i < numPartitions; i++) {
+          if (htables[i] != null) {
+              htables[i].clear();
+              htables[i] = null;
+          }
+          if ( batchHolders[i] != null) {
+              for (BatchHolder bh : batchHolders[i]) {
+                    bh.clear();
+              }
+              batchHolders[i].clear();
+              batchHolders[i] = null;
+          }
+
+          // delete any (still active) output spill file
+          if ( outputStream[i] != null && spillFiles[i] != null) {
+            try {
+              outputStream[i].close();
+              outputStream[i] = null;
+              spillSet.delete(spillFiles[i]);
+              spillFiles[i] = null;
+            } catch(IOException e) {
+              logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]);
+            }
+          }
     }
+    // delete any spill file left in unread spilled partitions
+    while ( ! spilledPartitionsList.isEmpty() ) {
+        SpilledPartition sp = spilledPartitionsList.remove(0);
+        try {
+          spillSet.delete(sp.spillFile);
+        } catch(IOException e) {
+          logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile);
+        }
+    }
+    // Delete the currently handled (if any) spilled file
+    if ( newIncoming != null ) { newIncoming.close();  }
+    spillSet.close(); // delete the spill directory(ies)
     htIdxHolder = null;
     materializedValueFields = null;
     outStartIdxHolder = null;
     outNumRecordsHolder = null;
+  }
 
-    if (batchHolders != null) {
-      for (BatchHolder bh : batchHolders) {
+  // First free the memory used by the given (spilled) partition (i.e., hash table plus batches)
+  // then reallocate them in pristine state to allow the partition to continue receiving rows
+  private void reinitPartition(int part) /* throws SchemaChangeException /*, IOException */ {
+    assert htables[part] != null;
+    htables[part].reset();
+    if ( batchHolders[part] != null) {
+      for (BatchHolder bh : batchHolders[part]) {
         bh.clear();
       }
-      batchHolders.clear();
-      batchHolders = null;
+      batchHolders[part].clear();
     }
+    batchHolders[part] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
   }
 
-//  private final AggOutcome setOkAndReturn() {
-//    this.outcome = IterOutcome.OK;
-//    for (VectorWrapper<?> v : outgoing) {
-//      v.getValueVector().getMutator().setValueCount(outputCount);
-//    }
-//    return AggOutcome.RETURN_OUTCOME;
-//  }
-
   private final void incIndex() {
     underlyingIndex++;
-    if (underlyingIndex >= incoming.getRecordCount()) {
+    if (underlyingIndex >= currentBatchRecordCount) {
       currentIndex = Integer.MAX_VALUE;
       return;
     }
-    currentIndex = getVectorIndex(underlyingIndex);
+    try { currentIndex = getVectorIndex(underlyingIndex); }
+    catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
   }
 
   private final void resetIndex() {
@@ -446,71 +744,337 @@ public abstract class HashAggTemplate implements HashAggregator {
     incIndex();
   }
 
-  private void addBatchHolder() {
+  private boolean isSpilled(int part) {
+    return outputStream[part] != null;
+  }
+  /**
+   * Which partition to choose for flushing out (i.e. spill or return) ?
+   * - The current partition (to which a new bach holder is added) has a priority,
+   *   because its last batch holder is full.
+   * - Also the largest prior spilled partition has some priority, as it is already spilled;
+   *   but spilling too few rows (e.g. a single batch) gets us nothing.
+   * - So the largest non-spilled partition has some priority, to get more memory freed.
+   * Need to weigh the above three options.
+   *
+   *  @param currPart - The partition that hit the memory limit (gets a priority)
+   *  @return The partition (number) chosen to be spilled
+   */
+  private int chooseAPartitionToFlush(int currPart) {
+    if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition
+    int currPartSize = batchHolders[currPart].size();
+    if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1
+    // first find the largest spilled partition
+    int maxSizeSpilled = -1;
+    int indexMaxSpilled = -1;
+    for (int isp = 0; isp < numPartitions; isp++ ) {
+      if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
+        maxSizeSpilled = batchHolders[isp].size();
+        indexMaxSpilled = isp;
+      }
+    }
+    // Give the current (if already spilled) some priority
+    if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
+      maxSizeSpilled = currPartSize ;
+      indexMaxSpilled = currPart;
+    }
+    // now find the largest non-spilled partition
+    int maxSize = -1;
+    int indexMax = -1;
+    // Use the largest spilled (if found) as a base line, with a factor of 4
+    if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) {
+      indexMax = indexMaxSpilled;
+      maxSize = 4 * maxSizeSpilled ;
+    }
+    for ( int insp = 0; insp < numPartitions; insp++) {
+      if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
+        indexMax = insp;
+        maxSize = batchHolders[insp].size();
+      }
+    }
+    // again - priority to the current partition
+    if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
+      return currPart;
+    }
+    if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch!
+      return -1; // try skipping this spill
+    }
+    return indexMax;
+  }
+
+  /**
+   * Iterate through the batches of the given partition, writing them to a file
+   *
+   * @param part The partition (number) to spill
+   */
+  private void spillAPartition(int part) {
+
+    ArrayList<BatchHolder> currPartition = batchHolders[part];
+    rowsInPartition = 0;
+    if ( EXTRA_DEBUG_SPILL ) {
+      logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}", part, cycleNum, currPartition.size());
+    }
+
+    if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill
+
+    // If this is the first spill for this partition, create an output stream
+    if ( ! isSpilled(part) ) {
+
+      spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum) : null);
+
+      try {
+        outputStream[part] = spillSet.openForOutput(spillFiles[part]);
+      } catch (IOException ioe) {
+        throw UserException.resourceError(ioe)
+            .message("Hash Aggregation failed to open spill file: " + spillFiles[part])
+            .build(logger);
+      }
+    }
+
+    for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++ ) {
+
+      // get the number of records in the batch holder that are pending output
+      int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();
+
+      rowsInPartition += numPendingOutput;  // for logging
+      rowsSpilled += numPendingOutput;
+
+      allocateOutgoing(numPendingOutput);
+
+      currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
+      int numOutputRecords = outNumRecordsHolder.value;
+
+      this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
+
+      // set the value count for outgoing batch value vectors
+      /* int i = 0; */
+      for (VectorWrapper<?> v : outgoing) {
+        v.getValueVector().getMutator().setValueCount(numOutputRecords);
+        /*
+        // print out the first row to be spilled ( varchar, varchar, bigint )
+        try {
+          if (i++ < 2) {
+            NullableVarCharVector vv = ((NullableVarCharVector) v.getValueVector());
+            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
+          } else {
+            NullableBigIntVector vv = ((NullableBigIntVector) v.getValueVector());
+            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
+          }
+        } catch (Exception e) { logger.info("While printing the first row - Got an exception = {}",e); }
+        */
+      }
+
+      outContainer.setRecordCount(numPendingOutput);
+      WritableBatch batch = WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false);
+      VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator);
+      Stopwatch watch = Stopwatch.createStarted();
+      try {
+        outputBatch.writeToStream(outputStream[part]);
+      } catch (IOException ioe) {
+        throw UserException.dataWriteError(ioe)
+            .message("Hash Aggregation failed to write to output stream: " + outputStream[part].toString())
+            .build(logger);
+      }
+      outContainer.zeroVectors();
+      logger.trace("HASH AGG: Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput);
+    }
+
+    spilledBatchesCount[part] += currPartition.size(); // update count of spilled batches
+
+    logger.trace("HASH AGG: Spilled {} rows from {} batches of partition {}", rowsInPartition, currPartition.size(), part);
+  }
+
+  private void addBatchHolder(int part) {
+
     BatchHolder bh = newBatchHolder();
-    batchHolders.add(bh);
+    batchHolders[part].add(bh);
 
     if (EXTRA_DEBUG_1) {
-      logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
+      logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders[part].size());
     }
 
     bh.setup();
   }
 
-  // Overridden in the generated class when created as plain Java code.
-
+  // These methods are overridden in the generated class when created as plain Java code.
   protected BatchHolder newBatchHolder() {
     return new BatchHolder();
   }
 
+  /**
+   * Output the next batch from partition "nextPartitionToReturn"
+   *
+   * @return iteration outcome (e.g., OK, NONE ...)
+   */
   @Override
-  public IterOutcome outputCurrentBatch() {
-    if (outBatchIndex >= batchHolders.size()) {
-      this.outcome = IterOutcome.NONE;
-      return outcome;
+  public AggIterOutcome outputCurrentBatch() {
+
+    // when incoming was an empty batch, just finish up
+    if ( schema == null ) {
+      logger.trace("Incoming was empty; output is an empty batch.");
+      this.outcome = IterOutcome.NONE; // no records were read
+      allFlushed = true;
+      return AggIterOutcome.AGG_NONE;
     }
 
-    // get the number of records in the batch holder that are pending output
-    int numPendingOutput = batchHolders.get(outBatchIndex).getNumPendingOutput();
+    // Initialization (covers the case of early output)
+    ArrayList<BatchHolder> currPartition = batchHolders[earlyPartition];
+    int currOutBatchIndex = outBatchIndex[earlyPartition];
+    int partitionToReturn = earlyPartition;
+
+    if ( ! earlyOutput ) {
+      // Update the next partition to return (if needed)
+      // skip fully returned (or spilled) partitions
+      while (nextPartitionToReturn < numPartitions) {
+        //
+        // If this partition was spilled - spill the rest of it and skip it
+        //
+        if ( isSpilled(nextPartitionToReturn) ) {
+          spillAPartition(nextPartitionToReturn); // spill the rest
+          SpilledPartition sp = new SpilledPartition();
+          sp.spillFile = spillFiles[nextPartitionToReturn];
+          sp.spilledBatches = spilledBatchesCount[nextPartitionToReturn];
+          sp.cycleNum = cycleNum; // remember the current cycle
+          sp.origPartn = nextPartitionToReturn; // for debugging / filename
+          sp.prevOrigPartn = originalPartition; // for debugging / filename
+          spilledPartitionsList.add(sp);
+
+          reinitPartition(nextPartitionToReturn); // free the memory
+          long posn = spillSet.getPosition(outputStream[nextPartitionToReturn]);
+          spillSet.tallyWriteBytes(posn); // for the IO stats
+          try {
+            outputStream[nextPartitionToReturn].close();
+          } catch (IOException ioe) {
+            throw UserException.resourceError(ioe)
+                .message("IO Error while closing output stream")
+                .build(logger);
+          }
+          outputStream[nextPartitionToReturn] = null;
+        }
+        else {
+          currPartition = batchHolders[nextPartitionToReturn];
+          currOutBatchIndex = outBatchIndex[nextPartitionToReturn];
+          // If curr batch (partition X index) is not empty - proceed to return it
+          if (currOutBatchIndex < currPartition.size() && 0 != currPartition.get(currOutBatchIndex).getNumPendingOutput()) {
+            break;
+          }
+        }
+        nextPartitionToReturn++; // else check next partition
+      }
+
+      // if passed the last partition - either done or need to restart and read spilled partitions
+      if (nextPartitionToReturn >= numPartitions) {
+        // The following "if" is probably never used; due to a similar check at the end of this method
+        if ( spilledPartitionsList.isEmpty() ) { // and no spilled partitions
+          allFlushed = true;
+          this.outcome = IterOutcome.NONE;
+          if ( is2ndPhase ) {
+            stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
+                (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
+          }
+          return AggIterOutcome.AGG_NONE;  // then return NONE
+        }
+        // Else - there are still spilled partitions to process - pick one and handle just like a new incoming
+        buildComplete = false; // go back and call doWork() again
+        handlingSpills = true; // beginning to work on the spill files
+        // pick a spilled partition; set a new incoming ...
+        SpilledPartition sp = spilledPartitionsList.remove(0);
+        // Create a new "incoming" out of the spilled partition spill file
+        newIncoming = new SpilledRecordbatch(sp.spillFile, sp.spilledBatches, context, schema, oContext, spillSet);
+        originalPartition = sp.origPartn; // used for the filename
+        logger.trace("Reading back spilled original partition {} as an incoming",originalPartition);
+        // Initialize .... new incoming, new set of partitions
+        try { initializeSetup(newIncoming); } catch (Exception e) { throw new RuntimeException(e); }
+        // update the cycle num if needed
+        // The current cycle num should always be one larger than in the spilled partition
+        if ( cycleNum == sp.cycleNum ) {
+          cycleNum = 1 + sp.cycleNum;
+          stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
+          // report first spill or memory stressful situations
+          if ( cycleNum == 1 ) { logger.info("Started reading spilled records "); }
+          if ( cycleNum == 2 ) { logger.info("SECONDARY SPILLING "); }
+          if ( cycleNum == 3 ) { logger.warn("TERTIARY SPILLING "); }
+          if ( cycleNum == 4 ) { logger.warn("QUATERNARY SPILLING "); }
+          if ( cycleNum == 5 ) { logger.warn("QUINARY SPILLING "); }
+        }
+        if ( EXTRA_DEBUG_SPILL ) {
+          logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {} batches). More {} spilled partitions left.",
+              sp.origPartn, sp.prevOrigPartn, sp.cycleNum, sp.spilledBatches, spilledPartitionsList.size());
+        }
+        return AggIterOutcome.AGG_RESTART;
+      }
+
+      partitionToReturn = nextPartitionToReturn ;
 
-    if (numPendingOutput == 0) {
-      this.outcome = IterOutcome.NONE;
-      return outcome;
     }
 
+    // get the number of records in the batch holder that are pending output
+    int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();
+
+    // The following accounting is for logging, metrics, etc.
+    rowsInPartition += numPendingOutput ;
+    if ( ! handlingSpills ) { rowsNotSpilled += numPendingOutput; }
+    else { rowsSpilledReturned += numPendingOutput; }
+    if ( earlyOutput ) { rowsReturnedEarly += numPendingOutput; }
+
     allocateOutgoing(numPendingOutput);
 
-    batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
+    currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
     int numOutputRecords = outNumRecordsHolder.value;
 
     if (EXTRA_DEBUG_1) {
       logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value, outNumRecordsHolder.value);
     }
-    this.htable.outputKeys(outBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
+
+    this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
 
     // set the value count for outgoing batch value vectors
     for (VectorWrapper<?> v : outgoing) {
       v.getValueVector().getMutator().setValueCount(numOutputRecords);
     }
 
-//    outputCount += numOutputRecords;
-
     this.outcome = IterOutcome.OK;
 
-    logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, numOutputRecords);
+    if ( EXTRA_DEBUG_SPILL && is2ndPhase ) {
+      logger.debug("So far returned {} + SpilledReturned {}  total {} (spilled {})",rowsNotSpilled,rowsSpilledReturned,
+        rowsNotSpilled+rowsSpilledReturned,
+        rowsSpilled);
+    }
 
     lastBatchOutputCount = numOutputRecords;
-    outBatchIndex++;
-    if (outBatchIndex == batchHolders.size()) {
-      allFlushed = true;
+    outBatchIndex[partitionToReturn]++;
+    // if just flushed the last batch in the partition
+    if (outBatchIndex[partitionToReturn] == currPartition.size()) {
+
+      if ( EXTRA_DEBUG_SPILL ) {
+        logger.debug("HashAggregate: {} Flushed partition {} with {} batches total {} rows",
+            earlyOutput ? "(Early)" : "",
+            partitionToReturn, outBatchIndex[partitionToReturn], rowsInPartition);
+      }
+      rowsInPartition = 0; // reset to count for the next partition
+
+      // deallocate memory used by this partition, and re-initialize
+      reinitPartition(partitionToReturn);
 
-      logger.debug("HashAggregate: All batches flushed.");
+      if ( earlyOutput ) {
 
-      // cleanup my internal state since there is nothing more to return
-      this.cleanup();
+        if ( EXTRA_DEBUG_SPILL ) {
+          logger.debug("HASH AGG: Finished (early) re-init partition {}, mem allocated: {}", earlyPartition, allocator.getAllocatedMemory());
+        }
+        outBatchIndex[earlyPartition] = 0; // reset, for next time
+        earlyOutput = false ; // done with early output
+      }
+      else if ( (partitionToReturn + 1 == numPartitions) && spilledPartitionsList.isEmpty() ) { // last partition ?
+
+        allFlushed = true; // next next() call will return NONE
+
+        logger.trace("HashAggregate: All batches flushed.");
+
+        // cleanup my internal state since there is nothing more to return
+        this.cleanup();
+      }
     }
 
-    return this.outcome;
+    return AggIterOutcome.AGG_OK;
   }
 
   @Override
@@ -522,11 +1086,33 @@ public abstract class HashAggTemplate implements HashAggregator {
   public boolean buildComplete() {
     return buildComplete;
   }
+  @Override
+  public boolean earlyOutput() { return earlyOutput; }
 
   public int numGroupedRecords() {
     return numGroupedRecords;
   }
 
+  /**
+   *  Generate a detailed error message in case of "Out Of Memory"
+   * @return err msg
+   */
+  private String getOOMErrorMsg() {
+    String errmsg;
+    if ( !isTwoPhase ) {
+      errmsg = "Single Phase Hash Aggregate operator can not spill." ;
+    } else if ( ! canSpill ) {  // 2nd phase, with only 1 partition
+      errmsg = "Too little memory available to operator to facilitate spilling.";
+    } else { // a bug ?
+      errmsg = "OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + numPartitions +
+      ". Estimated batch size: " + estMaxBatchSize + ". Planned batches: " + plannedBatches;
+      if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + rowsSpilled; }
+    }
+    errmsg += " Memory limit: " + allocator.getLimit() + " so far allocated: " + allocator.getAllocatedMemory() + ". ";
+
+    return errmsg;
+  }
+
   // Check if a group is present in the hash table; if not, insert it in the hash table.
   // The htIdxHolder contains the index of the group in the hash table container; this same
   // index is also used for the aggregation values maintained by the hash aggregate.
@@ -535,6 +1121,8 @@ public abstract class HashAggTemplate implements HashAggregator {
       throw new IllegalArgumentException("Invalid incoming row index.");
     }
 
+    assert ! earlyOutput;
+
     /** for debugging
      Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector();
      BigIntVector vv0 = null;
@@ -546,44 +1134,189 @@ public abstract class HashAggTemplate implements HashAggregator {
      holder.value = vv0.getAccessor().get(incomingRowIdx) ;
      }
      */
+    /*
+    if ( handlingSpills && ( incomingRowIdx == 0 ) ) {
+      // for debugging -- show the first row from a spilled batch
+      Object tmp0 = (incoming).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector();
+      Object tmp1 = (incoming).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector();
+      Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
+
+      if (tmp0 != null && tmp1 != null && tmp2 != null) {
+        NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
+        NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
+        NullableBigIntVector  vv2 = ((NullableBigIntVector) tmp2);
+        logger.debug("The first row = {} , {} , {}", vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx), vv2.getAccessor().get(incomingRowIdx));
+      }
+    }
+    */
+    // The hash code is computed once, then its lower bits are used to determine the
+    // partition to use, and the higher bits determine the location in the hash table.
+    int hashCode;
+    try {
+      htables[0].updateBatches();
+      hashCode = htables[0].getHashCode(incomingRowIdx);
+    } catch (SchemaChangeException e) {
+      throw new UnsupportedOperationException("Unexpected schema change", e);
+    }
 
-    htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */);
+    // right shift hash code for secondary (or tertiary...) spilling
+    for (int i = 0; i < cycleNum; i++) { hashCode >>>= bitsInMask; }
 
-    int currentIdx = htIdxHolder.value;
+    int currentPartition = hashCode & partitionMask ;
+    hashCode >>>= bitsInMask;
+    HashTable.PutStatus putStatus = null;
+    long allocatedBefore = allocator.getAllocatedMemory();
 
-    // get the batch index and index within the batch
-    if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) {
-      addBatchHolder();
+    // Insert the key columns into the hash table
+    try {
+      putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
+    } catch (OutOfMemoryException exc) {
+      throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill
+    } catch (SchemaChangeException e) {
+      throw new UnsupportedOperationException("Unexpected schema change", e);
     }
-    BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK);
-    int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
+    int currentIdx = htIdxHolder.value;
 
-    // Check if we have almost filled up the workspace vectors and add a batch if necessary
-    if ((idxWithinBatch == (bh.capacity - 1)) && (bh.allocatedNextBatch == false)) {
-      htable.addNewKeyBatch();
-      addBatchHolder();
-      bh.allocatedNextBatch = true;
+    long addedMem = allocator.getAllocatedMemory() - allocatedBefore;
+    if ( addedMem > 0 ) {
+      logger.trace("MEMORY CHECK HT: allocated {}  added {} partition {}",allocatedBefore,addedMem,currentPartition);
     }
 
+    // Check if put() added a new batch (for the keys) inside the hash table, hence a matching batch
+    // (for the aggregate columns) needs to be created
+    if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) {
+      try {
+        long allocatedBeforeAggCol = allocator.getAllocatedMemory();
+
+        addBatchHolder(currentPartition);
+
+        if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch
+        long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore;
+        logger.trace("MEMORY CHECK AGG: added {}  total (with HT) added {}",allocator.getAllocatedMemory()-allocatedBeforeAggCol,totalAddedMem);
+        // resize the batch estimate if needed (e.g., varchars may take more memory than estimated)
+        if ( totalAddedMem > estMaxBatchSize ) {
+          logger.trace("Adjusting Batch size estimate from {} to {}",estMaxBatchSize,totalAddedMem);
+          estMaxBatchSize = totalAddedMem;
+        }
+      } catch (OutOfMemoryException exc) {
+        throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill
+      }
+    }
+    BatchHolder bh = batchHolders[currentPartition].get((currentIdx >>> 16) & HashTable.BATCH_MASK);
+    int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
 
     if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
       numGroupedRecords++;
     }
+
+    // ===================================================================================
+    // If the last batch just became full - that is the time to check the memory limits !!
+    // If exceeded, then need to spill (if 2nd phase) or output early (1st)
+    // (Skip this if cannot spill; in such case an OOM may be encountered later)
+    // ===================================================================================
+    if ( putStatus == HashTable.PutStatus.KEY_ADDED_LAST && canSpill ) {
+
+      plannedBatches++; // planning to allocate one more batch
+
+      // calculate the (max) new memory needed now
+      long hashTableDoublingSizeNeeded = 0; // in case the hash table(s) would resize
+      for ( HashTable ht : htables ) {
+        hashTableDoublingSizeNeeded += ht.extraMemoryNeededForResize();
+      }
+
+      // Plan ahead for at least MIN batches, to account for size changing, and some overhead
+      long maxMemoryNeeded = minBatchesPerPartition * plannedBatches *
+          ( estMaxBatchSize + MAX_BATCH_SIZE * ( 4 + 4 /* links + hash-values */) ) +
+          hashTableDoublingSizeNeeded;
+
+      // log a detailed debug message explaining why a spill may be needed
+      logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " +
+          "Memory needed {}, Est batch size {}, mem limit {}",
+          allocator.getAllocatedMemory(), isTwoPhase?(is2ndPhase?"2ND":"1ST"):"Single", currentPartition,
+          batchHolders[currentPartition].size(), maxMemoryNeeded, estMaxBatchSize, memoryLimit);
+      //
+      //   Spill if the allocated memory plus the memory needed exceeds the memory limit.
+      //
+      if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) {
+
+        // Pick a "victim" partition to spill or return
+        int victimPartition = chooseAPartitionToFlush(currentPartition);
+
+        // In case no partition has more than one batch -- try and "push the limits"; maybe next
+        // time the spill could work.
+        if ( victimPartition < 0 ) { return; }
+
+        if ( is2ndPhase ) {
+          long before = allocator.getAllocatedMemory();
+
+          spillAPartition(victimPartition);
+          logger.trace("RAN OUT OF MEMORY: Spilled partition {}",victimPartition);
+
+          // Re-initialize (free memory, then recreate) the partition just spilled/returned
+          reinitPartition(victimPartition);
+
+          // in some "edge" cases (e.g. testing), spilling one partition may not be enough
+          if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) {
+              int victimPartition2 = chooseAPartitionToFlush(victimPartition);
+              if ( victimPartition2 < 0 ) { return; }
+              long after = allocator.getAllocatedMemory();
+              spillAPartition(victimPartition2);
+              reinitPartition(victimPartition2);
+              logger.warn("A Second Spill was Needed: allocated before {}, after first spill {}, after second {}, memory needed {}",
+                  before, after, allocator.getAllocatedMemory(), maxMemoryNeeded);
+              logger.trace("Second Partition Spilled: {}",victimPartition2);
+          }
+        }
+        else {
+          // 1st phase need to return a partition early in order to free some memory
+          earlyOutput = true;
+          earlyPartition = victimPartition;
+
+          if ( EXTRA_DEBUG_SPILL ) {
+            logger.debug("picked partition {} for early output", victimPartition);
+          }
+        }
+      }
+    }
   }
 
-  private void updateStats(HashTable htable) {
-    htable.getStats(htStats);
+  /**
+   * Updates the stats at the time after all the input was read.
+   * Note: For spilled partitions, their hash-table stats from before the spill are lost.
+   * And the SPILLED_PARTITIONS only counts the spilled partitions in the primary, not SECONDARY etc.
+   * @param htables
+   */
+  private void updateStats(HashTable[] htables) {
+    if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files
+    long numSpilled = 0;
+    HashTableStats newStats = new HashTableStats();
+    // sum the stats from all the partitions
+    for (int ind = 0; ind < numPartitions; ind++) {
+      htables[ind].getStats(newStats);
+      htStats.addStats(newStats);
+      if (isSpilled(ind)) {
+        numSpilled++;
+      }
+    }
     this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
     this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
     this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
     this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
+    this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
+    if ( is2ndPhase ) {
+      this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
+    }
+    if ( rowsReturnedEarly > 0 ) {
+      stats.setLongStat(Metric.SPILL_MB, // update stats - est. total MB returned early
+          (int) Math.round( rowsReturnedEarly * estRowWidth / 1024.0D / 1024.0));
+    }
   }
 
   // Code-generated methods (implemented in HashAggBatch)
-  public abstract void doSetup(@Named("incoming") RecordBatch incoming);
+  public abstract void doSetup(@Named("incoming") RecordBatch incoming) throws SchemaChangeException;
 
-  public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
+  public abstract int getVectorIndex(@Named("recordIndex") int recordIndex) throws SchemaChangeException;
 
-  public abstract boolean resetValues();
+  public abstract boolean resetValues() throws SchemaChangeException;
 
 }