You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/09/22 23:40:34 UTC

[2/2] drill git commit: DRILL-5694: Handle OOM in HashAggr by spill and retry, reserve memory, spinner

DRILL-5694: Handle OOM in HashAggr by spill and retry, reserve memory, spinner


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d77ab318
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d77ab318
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d77ab318

Branch: refs/heads/master
Commit: d77ab31836e5e5b88c124e0c7540bccd0544dedb
Parents: 4c99f0c
Author: Ben-Zvi <bb...@mapr.com>
Authored: Wed Sep 20 14:39:21 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Thu Sep 21 18:35:08 2017 -0700

----------------------------------------------------------------------
 .../exceptions/RetryAfterSpillException.java    |  26 ++
 .../drill/common/exceptions/UserException.java  |  36 ++
 .../org/apache/drill/exec/ExecConstants.java    |  12 +-
 .../physical/impl/aggregate/HashAggBatch.java   |  11 +-
 .../impl/aggregate/HashAggTemplate.java         | 369 +++++++++++++------
 .../physical/impl/aggregate/HashAggregator.java |   6 +-
 .../impl/aggregate/SpilledRecordbatch.java      |  12 +-
 .../physical/impl/common/ChainedHashTable.java  |   1 +
 .../exec/physical/impl/common/HashTable.java    |   3 +-
 .../physical/impl/common/HashTableTemplate.java | 129 ++++---
 .../exec/physical/impl/join/HashJoinBatch.java  |   6 +-
 .../physical/impl/spill/RecordBatchSizer.java   |   2 +-
 .../exec/physical/impl/spill/SpillSet.java      |  26 +-
 .../impl/xsort/managed/ExternalSortBatch.java   |   3 +-
 .../server/options/SystemOptionManager.java     |   1 +
 .../src/main/resources/drill-module.conf        |  29 +-
 .../physical/impl/agg/TestHashAggrSpill.java    | 163 ++++----
 .../impl/xsort/managed/TestSortImpl.java        |  12 +-
 .../apache/drill/exec/memory/BaseAllocator.java |   8 +-
 .../codegen/templates/NullableValueVectors.java |   2 +-
 .../templates/VariableLengthVectors.java        |   2 +-
 21 files changed, 523 insertions(+), 336 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/common/src/main/java/org/apache/drill/common/exceptions/RetryAfterSpillException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/RetryAfterSpillException.java b/common/src/main/java/org/apache/drill/common/exceptions/RetryAfterSpillException.java
new file mode 100644
index 0000000..8fd5a95
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/exceptions/RetryAfterSpillException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.exceptions;
+
+/**
+ *  A special exception to be caught by caller, who is supposed to free memory by spilling and try again
+ *
+ */
+public class RetryAfterSpillException extends Exception {
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index dd4fd36..4ea97e5 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -22,6 +22,12 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.slf4j.Logger;
 
+import java.io.File;
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.lang.management.ManagementFactory;
+
+import static java.lang.Thread.sleep;
 /**
  * Base class for all user exception. The goal is to separate out common error conditions where we can give users
  * useful feedback.
@@ -536,6 +542,36 @@ public class UserException extends DrillRuntimeException {
      * @return user exception
      */
     public UserException build(final Logger logger) {
+
+      // To allow for debugging:
+      //
+      //   A spinner code to make the execution stop here while the file '/tmp/drill/spin' exists
+      // Can be used to attach a debugger, use jstack, etc
+      // (do "clush -a touch /tmp/drill/spin" to turn this on across all the cluster nodes, and to
+      //  release the spinning threads do "clush -a rm /tmp/drill/spin")
+      // The processID of the spinning thread (along with the error message) should then be found
+      // in a file like  /tmp/drill/spin4148663301172491613.tmp
+      File spinFile = new File("/tmp/drill/spin");
+      if ( spinFile.exists() ) {
+        File tmpDir = new File("/tmp/drill");
+        File outErr = null;
+        try {
+          outErr = File.createTempFile("spin", ".tmp", tmpDir);
+          BufferedWriter bw = new BufferedWriter(new FileWriter(outErr));
+          bw.write("Spinning process: " + ManagementFactory.getRuntimeMXBean().getName()
+          /* After upgrading to JDK 9 - replace with: ProcessHandle.current().getPid() */);
+          bw.write("\nError cause: " +
+            (errorType == DrillPBError.ErrorType.SYSTEM ? ("SYSTEM ERROR: " + ErrorHelper.getRootMessage(cause)) : message));
+          bw.close();
+        } catch (Exception ex) {
+          logger.warn("Failed creating a spinner tmp message file: {}", ex);
+        }
+        while (spinFile.exists()) {
+          try { sleep(1_000); } catch (Exception ex) { /* ignore interruptions */ }
+        }
+        try { outErr.delete(); } catch (Exception ex) { } // cleanup - remove err msg file
+      }
+
       if (uex != null) {
         return uex;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 4fa846f..2b32569 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -93,18 +93,20 @@ public interface ExecConstants {
 
   // Hash Aggregate Options
 
-  String HASHAGG_NUM_PARTITIONS = "drill.exec.hashagg.num_partitions";
   String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
   LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no spilling
-  String HASHAGG_MAX_MEMORY = "drill.exec.hashagg.mem_limit";
   String HASHAGG_MAX_MEMORY_KEY = "exec.hashagg.mem_limit";
   LongValidator HASHAGG_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHAGG_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE);
   // min batches is used for tuning (each partition needs so many batches when planning the number of partitions,
   // or reserve this number when calculating whether the remaining available memory is too small and requires a spill.)
   // Low value may OOM (e.g., when incoming rows become wider), higher values use fewer partitions but are safer
-  String HASHAGG_MIN_BATCHES_PER_PARTITION = "drill.exec.hashagg.min_batches_per_partition";
-  String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "drill.exec.hashagg.min_batches_per_partition";
-  LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5);
+  String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "exec.hashagg.min_batches_per_partition";
+  LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 1, 5);
+  // Can be turned off mainly for testing. Memory prediction is used to decide on when to spill to disk; with this option off,
+  // spill would be triggered only by another mechanism -- "catch OOMs and then spill".
+  String HASHAGG_USE_MEMORY_PREDICTION_KEY = "exec.hashagg.use_memory_prediction";
+  BooleanValidator HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR = new BooleanValidator(HASHAGG_USE_MEMORY_PREDICTION_KEY);
+
   String HASHAGG_SPILL_DIRS = "drill.exec.hashagg.spill.directories";
   String HASHAGG_SPILL_FILESYSTEM = "drill.exec.hashagg.spill.fs";
   String HASHAGG_FALLBACK_ENABLED_KEY = "drill.exec.hashagg.fallback.enabled";

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 97e0599..314cf6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -25,6 +25,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.FunctionHolderExpression;
 import org.apache.drill.common.expression.IfExpression;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.data.NamedExpression;
@@ -251,6 +252,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       groupByOutFieldIds[i] = container.add(vv);
     }
 
+    int extraNonNullColumns = 0; // each of SUM, MAX and MIN gets an extra bigint column
     for (i = 0; i < numAggrExprs; i++) {
       NamedExpression ne = popConfig.getAggrExprs().get(i);
       final LogicalExpression expr =
@@ -268,6 +270,10 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
         continue;
       }
 
+      if ( expr instanceof FunctionHolderExpression ) {
+         String funcName = ((FunctionHolderExpression) expr).getName();
+         if ( funcName.equals("sum") || funcName.equals("max") || funcName.equals("min") ) {extraNonNullColumns++;}
+      }
       final MaterializedField outputField = MaterializedField.create(ne.getRef().getAsNamePart().getName(), expr.getMajorType());
       @SuppressWarnings("resource")
       ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -288,12 +294,11 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
         new HashTableConfig((int)context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
             HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */, comparators);
 
-    agg.setup(popConfig, htConfig, context, this.stats,
-        oContext, incoming, this,
+    agg.setup(popConfig, htConfig, context, oContext, incoming, this,
         aggrExprs,
         cgInner.getWorkspaceTypes(),
         groupByOutFieldIds,
-        this.container);
+        this.container, extraNonNullColumns * 8 /* sizeof(BigInt) */);
 
     return agg;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index a3b1ceb..1e65c49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
 import javax.inject.Named;
 
 import com.google.common.base.Stopwatch;
-
+import org.apache.drill.common.exceptions.RetryAfterSpillException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
@@ -60,8 +60,6 @@ import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.planner.physical.AggPrelBase;
 
-import org.apache.drill.exec.proto.UserBitShared;
-
 import org.apache.drill.exec.record.MaterializedField;
 
 import org.apache.drill.exec.record.RecordBatch;
@@ -109,14 +107,21 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
   private boolean is2ndPhase = false;
-  private boolean canSpill = true; // make it false in case can not spill
+  private boolean is1stPhase = false;
+  private boolean canSpill = true; // make it false in case can not spill/return-early
   private ChainedHashTable baseHashTable;
   private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory
   private int earlyPartition = 0; // which partition to return early
-
-  private long memoryLimit; // max memory to be used by this oerator
-  private long estMaxBatchSize = 0; // used for adjusting #partitions
-  private long estRowWidth = 0;
+  private boolean retrySameIndex = false; // in case put failed during 1st phase - need to output early, then retry
+  private boolean useMemoryPrediction = false; // whether to use memory prediction to decide when to spill
+  private long estMaxBatchSize = 0; // used for adjusting #partitions and deciding when to spill
+  private long estRowWidth = 0; // the size of the internal "row" (keys + values + extra columns)
+  private long estValuesRowWidth = 0; // the size of the internal values ( values + extra )
+  private long estOutputRowWidth = 0; // the size of the output "row" (no extra columns)
+  private long estValuesBatchSize = 0; // used for "reserving" memory for the Values batch to overcome an OOM
+  private long estOutgoingAllocSize = 0; // used for "reserving" memory for the Outgoing Output Values to overcome an OOM
+  private long reserveValueBatchMemory; // keep "reserve memory" for Values Batch
+  private long reserveOutgoingMemory; // keep "reserve memory" for the Outgoing (Values only) output
   private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
   private long minBatchesPerPartition; // for tuning - num partitions and spill decision
   private long plannedBatches = 0; // account for planned, but not yet allocated batches
@@ -297,10 +302,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   }
 
   @Override
-  public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
-                    OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
-                    LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds,
-                    VectorContainer outContainer) throws SchemaChangeException, IOException {
+  public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
 
     if (valueExprs == null || valueFieldIds == null) {
       throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
@@ -310,25 +312,27 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
 
     this.context = context;
-    this.stats = stats;
+    this.stats = oContext.getStats();
     this.allocator = oContext.getAllocator();
     this.oContext = oContext;
     this.incoming = incoming;
     this.outgoing = outgoing;
     this.outContainer = outContainer;
     this.operatorId = hashAggrConfig.getOperatorId();
+    this.useMemoryPrediction = context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR);
 
     is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2;
     isTwoPhase = hashAggrConfig.getAggPhase() != AggPrelBase.OperatorPhase.PHASE_1of1;
+    is1stPhase = isTwoPhase && ! is2ndPhase ;
     canSpill = isTwoPhase; // single phase can not spill
 
     // Typically for testing - force a spill after a partition has more than so many batches
-    minBatchesPerPartition = context.getConfig().getLong(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION);
+    minBatchesPerPartition = context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR);
 
     // Set the memory limit
-    memoryLimit = allocator.getLimit();
+    long memoryLimit = allocator.getLimit();
     // Optional configured memory limit, typically used only for testing.
-    long configLimit = context.getConfig().getLong(ExecConstants.HASHAGG_MAX_MEMORY);
+    long configLimit = context.getOptions().getOption(ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR);
     if (configLimit > 0) {
       logger.warn("Memory limit was changed to {}",configLimit);
       memoryLimit = Math.min(memoryLimit, configLimit);
@@ -370,6 +374,10 @@ public abstract class HashAggTemplate implements HashAggregator {
     this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill)
     numGroupByOutFields = groupByOutFieldIds.length;
 
+    // Start calculating the row widths (with the extra columns; the rest would be done in updateEstMaxBatchSize() )
+    estRowWidth = extraRowBytes;
+    estValuesRowWidth = extraRowBytes;
+
     doSetup(incoming);
   }
 
@@ -382,19 +390,25 @@ public abstract class HashAggTemplate implements HashAggregator {
     final boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
 
     // Set the number of partitions from the configuration (raise to a power of two, if needed)
-    numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS);
-    if ( numPartitions == 1 ) {
+    numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
+    if ( numPartitions == 1 && is2ndPhase  ) { // 1st phase can still do early return with 1 partition
       canSpill = false;
       logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
     }
     numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
 
-    if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch
+    if ( schema == null ) { estValuesBatchSize = estOutgoingAllocSize = estMaxBatchSize = 0; } // incoming was an empty batch
     else {
       // Estimate the max batch size; should use actual data (e.g. lengths of varchars)
       updateEstMaxBatchSize(incoming);
     }
-    long memAvail = memoryLimit - allocator.getAllocatedMemory();
+    // create "reserved memory" and adjust the memory limit down
+    reserveValueBatchMemory = reserveOutgoingMemory = estValuesBatchSize ;
+    long newMemoryLimit = allocator.getLimit() - reserveValueBatchMemory - reserveOutgoingMemory ;
+    long memAvail = newMemoryLimit - allocator.getAllocatedMemory();
+    if ( memAvail <= 0 ) { throw new OutOfMemoryException("Too little memory available"); }
+    allocator.setLimit(newMemoryLimit);
+
     if ( !canSpill ) { // single phase, or spill disabled by configuation
       numPartitions = 1; // single phase should use only a single partition (to save memory)
     } else { // two phase
@@ -464,6 +478,8 @@ public abstract class HashAggTemplate implements HashAggregator {
       }
       this.batchHolders[i] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
     }
+    // Initialize the value vectors in the generated code (which point to the incoming or outgoing fields)
+    try { htables[0].updateBatches(); } catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc); };
   }
   /**
    * get new incoming: (when reading spilled files like an "incoming")
@@ -500,22 +516,45 @@ public abstract class HashAggTemplate implements HashAggregator {
    */
   private void updateEstMaxBatchSize(RecordBatch incoming) {
     if ( estMaxBatchSize > 0 ) { return; }  // no handling of a schema (or varchar) change
+    // Use the sizer to get the input row width and the length of the longest varchar column
     RecordBatchSizer sizer = new RecordBatchSizer(incoming);
     logger.trace("Incoming sizer: {}",sizer);
     // An empty batch only has the schema, can not tell actual length of varchars
     // else use the actual varchars length, each capped at 50 (to match the space allocation)
-    estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
-    estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
+    long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
 
     // Get approx max (varchar) column width to get better memory allocation
-    maxColumnWidth = Math.max(sizer.maxSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
+    maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
     maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE);
 
-    logger.trace("{} phase. Estimated row width: {}  batch size: {}  memory limit: {}  max column width: {}",
-        isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
+    //
+    // Calculate the estimated max (internal) batch (i.e. Keys batch + Values batch) size
+    // (which is used to decide when to spill)
+    // Also calculate the values batch size (used as a reserve to overcome an OOM)
+    //
+    Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
+    int fieldId = 0;
+    while (outgoingIter.hasNext()) {
+      ValueVector vv = outgoingIter.next().getValueVector();
+      MaterializedField mr = vv.getField();
+      int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth :
+          TypeHelper.getSize(mr.getType());
+      estRowWidth += fieldSize;
+      estOutputRowWidth += fieldSize;
+      if ( fieldId < numGroupByOutFields ) { fieldId++; }
+      else { estValuesRowWidth += fieldSize; }
+    }
+    // multiply by the max number of rows in a batch to get the final estimated max size
+    estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE;
+    // (When there are no aggr functions, use '1' as later code relies on this size being non-zero)
+    estValuesBatchSize = Math.max(estValuesRowWidth, 1) * MAX_BATCH_SIZE;
+    estOutgoingAllocSize = estValuesBatchSize; // initially assume same size
+
+    logger.trace("{} phase. Estimated internal row width: {} Values row width: {} batch size: {}  memory limit: {}  max column width: {}",
+        isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth);
 
-    if ( estMaxBatchSize > memoryLimit ) {
-      logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,memoryLimit);
+    if ( estMaxBatchSize > allocator.getLimit() ) {
+      logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,allocator.getLimit());
     }
   }
 
@@ -545,16 +584,19 @@ public abstract class HashAggTemplate implements HashAggregator {
       if (EXTRA_DEBUG_1) {
         logger.debug("Starting outer loop of doWork()...");
       }
-      for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+      while (underlyingIndex < currentBatchRecordCount) {
         if (EXTRA_DEBUG_2) {
           logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
         }
         checkGroupAndAggrValues(currentIndex);
+
+        if ( retrySameIndex ) { retrySameIndex = false; }  // need to retry this row (e.g. we had an OOM)
+        else { incIndex(); } // next time continue with the next incoming row
+
         // If adding a group discovered a memory pressure during 1st phase, then start
         // outputing some partition downstream in order to free memory.
         if ( earlyOutput ) {
           outputCurrentBatch();
-          incIndex(); // next time continue with the next incoming row
           return AggOutcome.RETURN_OUTCOME;
         }
       }
@@ -573,9 +615,8 @@ public abstract class HashAggTemplate implements HashAggregator {
       // collecting stats on the spilled records)
       //
       long memAllocBeforeNext = allocator.getAllocatedMemory();
-
       if ( handlingSpills ) {
-        outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP;
+        outcome = incoming.next(); // get it from the SpilledRecordBatch
       } else {
         // Get the next RecordBatch from the incoming (i.e. upstream operator)
         outcome = outgoing.next(0, incoming);
@@ -646,6 +687,46 @@ public abstract class HashAggTemplate implements HashAggregator {
   }
 
   /**
+   *   Use reserved values memory (if available) to try and preemp an OOM
+   */
+  private void useReservedValuesMemory() {
+    // try to preempt an OOM by using the reserved memory
+    long reservedMemory = reserveValueBatchMemory;
+    if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
+
+    reserveValueBatchMemory = 0;
+  }
+  /**
+   *   Use reserved outgoing output memory (if available) to try and preemp an OOM
+   */
+  private void useReservedOutgoingMemory() {
+    // try to preempt an OOM by using the reserved memory
+    long reservedMemory = reserveOutgoingMemory;
+    if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
+
+    reserveOutgoingMemory = 0;
+  }
+  /**
+   *  Restore the reserve memory (both)
+   *
+   */
+  private void restoreReservedMemory() {
+    if ( 0 == reserveOutgoingMemory ) { // always restore OutputValues first (needed for spilling)
+      long memAvail = allocator.getLimit() - allocator.getAllocatedMemory();
+      if ( memAvail > estOutgoingAllocSize) {
+        allocator.setLimit(allocator.getLimit() - estOutgoingAllocSize);
+        reserveOutgoingMemory = estOutgoingAllocSize;
+      }
+    }
+    if ( 0 == reserveValueBatchMemory ) {
+      long memAvail = allocator.getLimit() - allocator.getAllocatedMemory();
+      if ( memAvail > estValuesBatchSize) {
+        allocator.setLimit(allocator.getLimit() - estValuesBatchSize);
+        reserveValueBatchMemory = estValuesBatchSize;
+      }
+    }
+  }
+  /**
    *   Allocate space for the returned aggregate columns
    *   (Note DRILL-5588: Maybe can eliminate this allocation (and copy))
    * @param records
@@ -657,12 +738,25 @@ public abstract class HashAggTemplate implements HashAggregator {
     for (int i = 0; i < numGroupByOutFields; i++) {
       outgoingIter.next();
     }
+
+    // try to preempt an OOM by using the reserved memory
+    useReservedOutgoingMemory();
+    long allocatedBefore = allocator.getAllocatedMemory();
+
     while (outgoingIter.hasNext()) {
       @SuppressWarnings("resource")
       ValueVector vv = outgoingIter.next().getValueVector();
 
       AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0);
     }
+
+    long memAdded = allocator.getAllocatedMemory() - allocatedBefore;
+    if ( memAdded > estOutgoingAllocSize ) {
+      logger.trace("Output values allocated {} but the estimate was only {}. Adjusting ...",memAdded,estOutgoingAllocSize);
+      estOutgoingAllocSize = memAdded;
+    }
+    // try to restore the reserve
+    restoreReservedMemory();
   }
 
   @Override
@@ -738,6 +832,9 @@ public abstract class HashAggTemplate implements HashAggregator {
       batchHolders[part].clear();
     }
     batchHolders[part] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
+
+    // in case the reserve memory was used, try to restore
+    restoreReservedMemory();
   }
 
   private final void incIndex() {
@@ -751,7 +848,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   }
 
   private final void resetIndex() {
-    underlyingIndex = -1;
+    underlyingIndex = -1; // will become 0 in incIndex()
     incIndex();
   }
 
@@ -768,10 +865,11 @@ public abstract class HashAggTemplate implements HashAggregator {
    * Need to weigh the above three options.
    *
    *  @param currPart - The partition that hit the memory limit (gets a priority)
-   *  @return The partition (number) chosen to be spilled
+   *  @param tryAvoidCurr - When true, give negative priority to the current partition
+   * @return The partition (number) chosen to be spilled
    */
-  private int chooseAPartitionToFlush(int currPart) {
-    if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition
+  private int chooseAPartitionToFlush(int currPart, boolean tryAvoidCurr) {
+    if ( is1stPhase && ! tryAvoidCurr) { return currPart; } // 1st phase: just use the current partition
     int currPartSize = batchHolders[currPart].size();
     if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1
     // first find the largest spilled partition
@@ -784,7 +882,7 @@ public abstract class HashAggTemplate implements HashAggregator {
       }
     }
     // Give the current (if already spilled) some priority
-    if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
+    if ( ! tryAvoidCurr && isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
       maxSizeSpilled = currPartSize ;
       indexMaxSpilled = currPart;
     }
@@ -803,7 +901,7 @@ public abstract class HashAggTemplate implements HashAggregator {
       }
     }
     // again - priority to the current partition
-    if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
+    if ( ! tryAvoidCurr && ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
       return currPart;
     }
     if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch!
@@ -898,7 +996,6 @@ public abstract class HashAggTemplate implements HashAggregator {
 
     BatchHolder bh = newBatchHolder();
     batchHolders[part].add(bh);
-
     if (EXTRA_DEBUG_1) {
       logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders[part].size());
     }
@@ -1117,7 +1214,8 @@ public abstract class HashAggTemplate implements HashAggregator {
       errmsg = "Too little memory available to operator to facilitate spilling.";
     } else { // a bug ?
       errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + numPartitions +
-      ". Estimated batch size: " + estMaxBatchSize + ". Planned batches: " + plannedBatches;
+      ". Estimated batch size: " + estMaxBatchSize + ". values size: " + estValuesBatchSize + ". Output alloc size: " + estOutgoingAllocSize;
+      if ( plannedBatches > 0 ) { errmsg += ". Planned batches: " + plannedBatches; }
       if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + rowsSpilled; }
     }
     errmsg += " Memory limit: " + allocator.getLimit() + " so far allocated: " + allocator.getAllocatedMemory() + ". ";
@@ -1129,10 +1227,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   // The htIdxHolder contains the index of the group in the hash table container; this same
   // index is also used for the aggregation values maintained by the hash aggregate.
   private void checkGroupAndAggrValues(int incomingRowIdx) {
-    if (incomingRowIdx < 0) {
-      throw new IllegalArgumentException("Invalid incoming row index.");
-    }
-
+    assert incomingRowIdx >= 0;
     assert ! earlyOutput;
 
     /** for debugging
@@ -1165,7 +1260,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     // partition to use, and the higher bits determine the location in the hash table.
     int hashCode;
     try {
-      htables[0].updateBatches();
+      // htables[0].updateBatches();
       hashCode = htables[0].getHashCode(incomingRowIdx);
     } catch (SchemaChangeException e) {
       throw new UnsupportedOperationException("Unexpected schema change", e);
@@ -1179,19 +1274,45 @@ public abstract class HashAggTemplate implements HashAggregator {
     HashTable.PutStatus putStatus = null;
     long allocatedBeforeHTput = allocator.getAllocatedMemory();
 
+    // Proactive spill - in case there is no reserve memory - spill and retry putting later
+    if ( reserveValueBatchMemory == 0 && canSpill ) {
+      logger.trace("Reserved memory runs short, trying to {} a partition and retry Hash Table put() again.",
+        is1stPhase ? "early return" : "spill");
+
+      doSpill(currentPartition); // spill to free some memory
+
+      retrySameIndex = true;
+      return; // to retry this put()
+    }
+
     // ==========================================
     // Insert the key columns into the hash table
     // ==========================================
     try {
+
       putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
+
+    } catch (RetryAfterSpillException re) {
+      if ( ! canSpill ) { throw new OutOfMemoryException(getOOMErrorMsg("Can not spill")); }
+
+      logger.trace("HT put failed with an OOM, trying to {} a partition and retry Hash Table put() again.",
+            is1stPhase ? "early return" : "spill");
+
+      // for debugging - in case there's a leak
+      long memDiff = allocator.getAllocatedMemory() - allocatedBeforeHTput;
+      if ( memDiff > 0 ) { logger.warn("Leak: HashTable put() OOM left behind {} bytes allocated",memDiff); }
+
+      doSpill(currentPartition); // spill to free some memory
+
+      retrySameIndex = true;
+      return; // to retry this put()
     } catch (OutOfMemoryException exc) {
-      throw new OutOfMemoryException(getOOMErrorMsg("HT was: " + allocatedBeforeHTput), exc); // may happen when can not spill
+        throw new OutOfMemoryException(getOOMErrorMsg("HT was: " + allocatedBeforeHTput), exc);
     } catch (SchemaChangeException e) {
-      throw new UnsupportedOperationException("Unexpected schema change", e);
+        throw new UnsupportedOperationException("Unexpected schema change", e);
     }
-
-    boolean needToCheckIfSpillIsNeeded = false;
     long allocatedBeforeAggCol = allocator.getAllocatedMemory();
+    boolean needToCheckIfSpillIsNeeded = allocatedBeforeAggCol > allocatedBeforeHTput ;
 
     // Add an Aggr batch if needed:
     //
@@ -1201,32 +1322,38 @@ public abstract class HashAggTemplate implements HashAggregator {
     if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) {
       try {
 
-        addBatchHolder(currentPartition);
+        useReservedValuesMemory(); // try to preempt an OOM by using the reserve
+
+        addBatchHolder(currentPartition);  // allocate a new (internal) values batch
+
+        restoreReservedMemory(); // restore the reserve, if possible
+        // A reason to check for a spill - In case restore-reserve failed
+        needToCheckIfSpillIsNeeded = ( 0 == reserveValueBatchMemory );
 
         if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch
         long totalAddedMem = allocator.getAllocatedMemory() - allocatedBeforeHTput;
-        logger.trace("MEMORY CHECK AGG: allocated now {}, added {}  total (with HT) added {}", allocator.getAllocatedMemory(),
-            allocator.getAllocatedMemory() - allocatedBeforeAggCol, totalAddedMem);
-        // resize the batch estimate if needed (e.g., varchars may take more memory than estimated)
+        long aggValuesAddedMem = allocator.getAllocatedMemory() - allocatedBeforeAggCol;
+        logger.trace("MEMORY CHECK AGG: allocated now {}, added {}, total (with HT) added {}", allocator.getAllocatedMemory(),
+            aggValuesAddedMem, totalAddedMem);
+        // resize the batch estimates if needed (e.g., varchars may take more memory than estimated)
         if (totalAddedMem > estMaxBatchSize) {
           logger.trace("Adjusting Batch size estimate from {} to {}", estMaxBatchSize, totalAddedMem);
           estMaxBatchSize = totalAddedMem;
-          needToCheckIfSpillIsNeeded = true; // better check the memory limits again now
+          needToCheckIfSpillIsNeeded = true;
+        }
+        if (aggValuesAddedMem > estValuesBatchSize) {
+          logger.trace("Adjusting Values Batch size from {} to {}",estValuesBatchSize, aggValuesAddedMem);
+          estValuesBatchSize = aggValuesAddedMem;
+          needToCheckIfSpillIsNeeded = true;
         }
       } catch (OutOfMemoryException exc) {
-        throw new OutOfMemoryException(getOOMErrorMsg("AGGR"), exc); // may happen when can not spill
+          throw new OutOfMemoryException(getOOMErrorMsg("AGGR"), exc);
       }
     } else if ( putStatus == HashTable.PutStatus.KEY_ADDED_LAST ) {
         // If a batch just became full (i.e. another batch would be allocated soon) -- then need to
         // check (later, see below) if the memory limits are too close, and if so -- then spill !
         plannedBatches++; // planning to allocate one more batch
         needToCheckIfSpillIsNeeded = true;
-    } else if ( allocatedBeforeAggCol > allocatedBeforeHTput ) {
-        // if HT put() allocated memory (other than a new batch; e.g. HT doubling, or buffer resizing)
-        // then better check again whether a spill is needed
-        needToCheckIfSpillIsNeeded = true;
-
-        logger.trace("MEMORY CHECK HT: was allocated {}  added {} partition {}",allocatedBeforeHTput, allocatedBeforeAggCol - allocatedBeforeHTput,currentPartition);
     }
 
     // =================================================================
@@ -1240,67 +1367,83 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
 
     // ===================================================================================
-    // If the last batch just became full - that is the time to check the memory limits !!
-    // If exceeded, then need to spill (if 2nd phase) or output early (1st)
-    // (Skip this if cannot spill; in such case an OOM may be encountered later)
+    // If the last batch just became full, or other "memory growing" events happened, then
+    // this is the time to check the memory limits !!
+    // If the limits were exceeded, then need to spill (if 2nd phase) or output early (1st)
+    // (Skip this if cannot spill, or not checking memory limits; in such case an OOM may
+    // be encountered later - and some OOM cases are recoverable by spilling and retrying)
     // ===================================================================================
-    if ( needToCheckIfSpillIsNeeded && canSpill ) {
+    if ( needToCheckIfSpillIsNeeded && canSpill && useMemoryPrediction ) {
+      spillIfNeeded(currentPartition);
+    }
+  }
 
-      // calculate the (max) new memory needed now
-      // Plan ahead for at least MIN batches
-      long maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) *
-          ( estMaxBatchSize + MAX_BATCH_SIZE * ( 4 + 4 /* links + hash-values */) );
+  private void spillIfNeeded(int currentPartition) { spillIfNeeded(currentPartition, false);}
+  private void doSpill(int currentPartition) { spillIfNeeded(currentPartition, true);}
+  /**
+   *  Spill (or return early, if 1st phase) if too little available memory is left
+   *  @param currentPartition - the preferred candidate for spilling
+   * @param forceSpill -- spill unconditionally (no memory checks)
+   */
+  private void spillIfNeeded(int currentPartition, boolean forceSpill) {
+    long maxMemoryNeeded = 0;
+    if ( !forceSpill ) { // need to check the memory in order to decide
+      // calculate the (max) new memory needed now; plan ahead for at least MIN batches
+      maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) * (estMaxBatchSize + MAX_BATCH_SIZE * (4 + 4 /* links + hash-values */));
       // Add the (max) size of the current hash table, in case it will double
       int maxSize = 1;
-      for ( int insp = 0; insp < numPartitions; insp++) { maxSize = Math.max(maxSize, batchHolders[insp].size()); }
+      for (int insp = 0; insp < numPartitions; insp++) {
+        maxSize = Math.max(maxSize, batchHolders[insp].size());
+      }
       maxMemoryNeeded += MAX_BATCH_SIZE * 2 * 2 * 4 * maxSize; // 2 - double, 2 - max when %50 full, 4 - Uint4
 
       // log a detailed debug message explaining why a spill may be needed
-      logger.debug("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " +
-          "Max memory needed {}, Est batch size {}, mem limit {}",
-          allocator.getAllocatedMemory(), isTwoPhase?(is2ndPhase?"2ND":"1ST"):"Single", currentPartition,
-          batchHolders[currentPartition].size(), maxMemoryNeeded, estMaxBatchSize, memoryLimit);
-      //
-      //   Spill if the allocated memory plus the memory needed exceeds the memory limit.
-      //
-      if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) {
-
-        // Pick a "victim" partition to spill or return
-        int victimPartition = chooseAPartitionToFlush(currentPartition);
-
-        // In case no partition has more than one batch -- try and "push the limits"; maybe next
-        // time the spill could work.
-        if ( victimPartition < 0 ) { return; }
-
-        if ( is2ndPhase ) {
-          long before = allocator.getAllocatedMemory();
-
-          spillAPartition(victimPartition);
-          logger.trace("RAN OUT OF MEMORY: Spilled partition {}",victimPartition);
-
-          // Re-initialize (free memory, then recreate) the partition just spilled/returned
-          reinitPartition(victimPartition);
-
-          // in some "edge" cases (e.g. testing), spilling one partition may not be enough
-          if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) {
-              int victimPartition2 = chooseAPartitionToFlush(victimPartition);
-              if ( victimPartition2 < 0 ) { return; }
-              long after = allocator.getAllocatedMemory();
-              spillAPartition(victimPartition2);
-              reinitPartition(victimPartition2);
-              logger.warn("A Second Spill was Needed: allocated before {}, after first spill {}, after second {}, memory needed {}",
-                  before, after, allocator.getAllocatedMemory(), maxMemoryNeeded);
-              logger.trace("Second Partition Spilled: {}",victimPartition2);
-          }
+      logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " + "Max memory needed {}, Est batch size {}, mem limit {}",
+          allocator.getAllocatedMemory(), isTwoPhase ? (is2ndPhase ? "2ND" : "1ST") : "Single", currentPartition, batchHolders[currentPartition].size(), maxMemoryNeeded,
+          estMaxBatchSize, allocator.getLimit());
+    }
+    //
+    //   Spill if (forced, or) the allocated memory plus the memory needed exceed the memory limit.
+    //
+    if ( forceSpill || allocator.getAllocatedMemory() + maxMemoryNeeded > allocator.getLimit() ) {
+
+      // Pick a "victim" partition to spill or return
+      int victimPartition = chooseAPartitionToFlush(currentPartition, forceSpill);
+
+      // In case no partition has more than one batch -- try and "push the limits"; maybe next
+      // time the spill could work.
+      if ( victimPartition < 0 ) { return; }
+
+      if ( is2ndPhase ) {
+        long before = allocator.getAllocatedMemory();
+
+        spillAPartition(victimPartition);
+        logger.trace("RAN OUT OF MEMORY: Spilled partition {}",victimPartition);
+
+        // Re-initialize (free memory, then recreate) the partition just spilled/returned
+        reinitPartition(victimPartition);
+
+        // In case spilling did not free enough memory to recover the reserves
+        boolean spillAgain = reserveOutgoingMemory == 0 || reserveValueBatchMemory == 0;
+        // in some "edge" cases (e.g. testing), spilling one partition may not be enough
+        if ( spillAgain || allocator.getAllocatedMemory() + maxMemoryNeeded > allocator.getLimit() ) {
+            int victimPartition2 = chooseAPartitionToFlush(victimPartition, true);
+            if ( victimPartition2 < 0 ) { return; }
+            long after = allocator.getAllocatedMemory();
+            spillAPartition(victimPartition2);
+            reinitPartition(victimPartition2);
+            logger.warn("A Second Spill was Needed: allocated before {}, after first spill {}, after second {}, memory needed {}",
+                before, after, allocator.getAllocatedMemory(), maxMemoryNeeded);
+            logger.trace("Second Partition Spilled: {}",victimPartition2);
         }
-        else {
-          // 1st phase need to return a partition early in order to free some memory
-          earlyOutput = true;
-          earlyPartition = victimPartition;
+      }
+      else {
+        // 1st phase need to return a partition early in order to free some memory
+        earlyOutput = true;
+        earlyPartition = victimPartition;
 
-          if ( EXTRA_DEBUG_SPILL ) {
-            logger.debug("picked partition {} for early output", victimPartition);
-          }
+        if ( EXTRA_DEBUG_SPILL ) {
+          logger.debug("picked partition {} for early output", victimPartition);
         }
       }
     }
@@ -1335,7 +1478,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
     if ( rowsReturnedEarly > 0 ) {
       stats.setLongStat(Metric.SPILL_MB, // update stats - est. total MB returned early
-          (int) Math.round( rowsReturnedEarly * estRowWidth / 1024.0D / 1024.0));
+          (int) Math.round( rowsReturnedEarly * estOutputRowWidth / 1024.0D / 1024.0));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 21d5a4a..16b5499 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -26,7 +26,6 @@ import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.record.RecordBatch;
@@ -47,10 +46,7 @@ public interface HashAggregator {
   // OK - batch returned, NONE - end of data, RESTART - call again
   public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
 
-  public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
-                             OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
-                             LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds,
-                             VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException;
+  public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
 
   public abstract IterOutcome getOutcome();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
index b05353e..ac4b29d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -64,7 +64,9 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
 
     try {
       this.spillStream = this.spillSet.openForInput(spillFile);
-    } catch (IOException e) { throw new RuntimeException(e);}
+    } catch (IOException e) {
+      throw UserException.resourceError(e).build(HashAggBatch.logger);
+    }
 
     next(); // initialize the container
   }
@@ -124,6 +126,8 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
   @Override
   public IterOutcome next() {
 
+    if ( ! context.shouldContinue() ) { return IterOutcome.STOP; }
+
     if ( spilledBatches <= 0 ) { // no more batches to read in this partition
       this.close();
       return IterOutcome.NONE;
@@ -155,6 +159,9 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
     return IterOutcome.OK;
   }
 
+  /**
+   * Note: ignoring any IO errors (e.g. file not found)
+   */
   @Override
   public void close() {
     container.clear();
@@ -167,9 +174,8 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
       spillSet.delete(spillFile);
     }
     catch (IOException e) {
-      throw new RuntimeException(e);
+      /* ignore */
     } finally {
-      spillSet.close();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 387dad1..db9622f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -141,6 +141,7 @@ public class ChainedHashTable {
     // This code is called from generated code, so to step into this code,
     // persist the code generated in HashAggBatch also.
     // top.saveCodeForDebugging(true);
+    top.preferPlainJava(true); // use a subclass
     ClassGenerator<HashTable> cg = top.getRoot();
     ClassGenerator<HashTable> cgInner = cg.getInnerGenerator("BatchHolder");
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 1e6570f..3749e3e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.common.exceptions.RetryAfterSpillException;
 
 public interface HashTable {
 
@@ -58,7 +59,7 @@ public interface HashTable {
 
   public int getHashCode(int incomingRowIdx) throws SchemaChangeException;
 
-  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException;
+  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
 
   public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 336026c..7a086aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -25,6 +25,7 @@ import javax.inject.Named;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
+import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -39,6 +40,7 @@ import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.common.exceptions.RetryAfterSpillException;
 
 public abstract class HashTableTemplate implements HashTable {
 
@@ -131,8 +133,8 @@ public abstract class HashTableTemplate implements HashTable {
       boolean success = false;
       try {
         for (VectorWrapper<?> w : htContainerOrig) {
-          @SuppressWarnings("resource")
           ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator);
+          htContainer.add(vv); // add to container before actual allocation (to allow clearing in case of an OOM)
 
           // Capacity for "hashValues" and "links" vectors is BATCH_SIZE records. It is better to allocate space for
           // "key" vectors to store as close to as BATCH_SIZE records. A new BatchHolder is created when either BATCH_SIZE
@@ -148,8 +150,6 @@ public abstract class HashTableTemplate implements HashTable {
           } else {
             vv.allocateNew();
           }
-
-          htContainer.add(vv);
         }
 
         links = allocMetadataVector(HashTable.BATCH_SIZE, EMPTY_SLOT);
@@ -158,19 +158,17 @@ public abstract class HashTableTemplate implements HashTable {
       } finally {
         if (!success) {
           htContainer.clear();
-          if (links != null) {
-            links.clear();
-          }
+          if (links != null) { links.clear();}
         }
       }
     }
 
     private void init(IntVector links, IntVector hashValues, int size) {
       for (int i = 0; i < size; i++) {
-        links.getMutator().setSafe(i, EMPTY_SLOT);
+        links.getMutator().set(i, EMPTY_SLOT);
       }
       for (int i = 0; i < size; i++) {
-        hashValues.getMutator().setSafe(i, 0);
+        hashValues.getMutator().set(i, 0);
       }
       links.getMutator().setValueCount(size);
       hashValues.getMutator().setValueCount(size);
@@ -215,6 +213,8 @@ public abstract class HashTableTemplate implements HashTable {
       int currentIdxWithinBatch = currentIdx & BATCH_MASK;
 
       setValue(incomingRowIdx, currentIdxWithinBatch);
+      // setValue may OOM when doubling of one of the VarChar Key Value Vectors
+      // This would be caught and retried later (setValue() is idempotent)
 
       // the previous entry in this hash chain should now point to the entry in this currentIdx
       if (lastEntryBatch != null) {
@@ -223,8 +223,8 @@ public abstract class HashTableTemplate implements HashTable {
 
       // since this is the last entry in the hash chain, the links array at position currentIdx
       // will point to a null (empty) slot
-      links.getMutator().setSafe(currentIdxWithinBatch, EMPTY_SLOT);
-      hashValues.getMutator().setSafe(currentIdxWithinBatch, hashValue);
+      links.getMutator().set(currentIdxWithinBatch, EMPTY_SLOT);
+      hashValues.getMutator().set(currentIdxWithinBatch, hashValue);
 
       maxOccupiedIdx = Math.max(maxOccupiedIdx, currentIdxWithinBatch);
 
@@ -235,7 +235,7 @@ public abstract class HashTableTemplate implements HashTable {
     }
 
     private void updateLinks(int lastEntryIdxWithinBatch, int currentIdx) {
-      links.getMutator().setSafe(lastEntryIdxWithinBatch, currentIdx);
+      links.getMutator().set(lastEntryIdxWithinBatch, currentIdx);
     }
 
     private void rehash(int numbuckets, IntVector newStartIndices, int batchStartIdx) {
@@ -254,9 +254,9 @@ public abstract class HashTableTemplate implements HashTable {
         int newStartIdx = newStartIndices.getAccessor().get(bucketIdx);
 
         if (newStartIdx == EMPTY_SLOT) { // new bucket was empty
-          newStartIndices.getMutator().setSafe(bucketIdx, entryIdx); // update the start index to point to entry
-          newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT);
-          newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
+          newStartIndices.getMutator().set(bucketIdx, entryIdx); // update the start index to point to entry
+          newLinks.getMutator().set(entryIdxWithinBatch, EMPTY_SLOT);
+          newHashValues.getMutator().set(entryIdxWithinBatch, hash);
 
           if (EXTRA_DEBUG) {
             logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, " +
@@ -279,9 +279,9 @@ public abstract class HashTableTemplate implements HashTable {
             }
 
             if (bh == this && newLinks.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) {
-              newLinks.getMutator().setSafe(idxWithinBatch, entryIdx);
-              newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT);
-              newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
+              newLinks.getMutator().set(idxWithinBatch, entryIdx);
+              newLinks.getMutator().set(entryIdxWithinBatch, EMPTY_SLOT);
+              newHashValues.getMutator().set(entryIdxWithinBatch, hash);
 
               if (EXTRA_DEBUG) {
                 logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, " +
@@ -293,10 +293,10 @@ public abstract class HashTableTemplate implements HashTable {
 
               break;
             } else if (bh != this && bh.links.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) {
-              bh.links.getMutator().setSafe(idxWithinBatch, entryIdx); // update the link in the other batch
-              newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); // update the newLink entry in this
+              bh.links.getMutator().set(idxWithinBatch, entryIdx); // update the link in the other batch
+              newLinks.getMutator().set(entryIdxWithinBatch, EMPTY_SLOT); // update the newLink entry in this
               // batch to mark end of the hash chain
-              newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
+              newHashValues.getMutator().set(entryIdxWithinBatch, hash);
 
               if (EXTRA_DEBUG) {
                 logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, " +
@@ -402,8 +402,8 @@ public abstract class HashTableTemplate implements HashTable {
 
     private void clear() {
       htContainer.clear();
-      links.clear();
-      hashValues.clear();
+      if ( links != null ) { links.clear(); }
+      if ( hashValues != null ) { hashValues.clear(); }
     }
 
     // Only used for internal debugging. Get the value vector at a particular index from the htContainer.
@@ -566,6 +566,17 @@ public abstract class HashTableTemplate implements HashTable {
     return rounded;
   }
 
+  private void retryAfterOOM(boolean batchAdded) throws RetryAfterSpillException {
+    // If a batch was added then undo; otherwise when retrying this put() we'd miss a NEW_BATCH_ADDED
+    if ( batchAdded ) {
+      logger.trace("OOM - Removing index {} from the batch holders list",batchHolders.size() - 1);
+      BatchHolder bh = batchHolders.remove(batchHolders.size() - 1);
+      bh.clear();
+    }
+    freeIndex--;
+    throw new RetryAfterSpillException();
+  }
+
   public int getHashCode(int incomingRowIdx) throws SchemaChangeException {
     return getHashBuild(incomingRowIdx);
   }
@@ -583,7 +594,7 @@ public abstract class HashTableTemplate implements HashTable {
    * @return Status - the key(s) was ADDED or was already PRESENT
    */
   @Override
-  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException {
+  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException {
 
     int bucketIndex = getBucketIndex(hashCode, numBuckets());
     int startIdx = startIndices.getAccessor().get(bucketIndex);
@@ -609,17 +620,42 @@ public abstract class HashTableTemplate implements HashTable {
 
     // no match was found, so insert a new entry
     currentIdx = freeIndex++;
-    boolean addedBatch = addBatchIfNeeded(currentIdx);
+    boolean addedBatch = false;
+    try {  // ADD A BATCH
+      addedBatch = addBatchIfNeeded(currentIdx);
+    } catch (OutOfMemoryException OOME) {
+      retryAfterOOM( currentIdx < batchHolders.size() * BATCH_SIZE );
+    }
+
+    try { // INSERT ENTRY
+      BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
+
+      bh.insertEntry(incomingRowIdx, currentIdx, hashCode, lastEntryBatch, lastEntryIdxWithinBatch);
+      numEntries++;
+    } catch (OutOfMemoryException OOME) { retryAfterOOM( addedBatch ); }
+
+    try {  // RESIZE HT
+      /* Resize hash table if needed and transfer the metadata
+       * Resize only after inserting the current entry into the hash table
+       * Otherwise our calculated lastEntryBatch and lastEntryIdx
+       * becomes invalid after resize.
+       */
+      resizeAndRehashIfNeeded();
+    } catch (OutOfMemoryException OOME) {
+      numEntries--; // undo - insert entry
+      if (lastEntryBatch != null) { // undo last added link in chain (if any)
+        lastEntryBatch.updateLinks(lastEntryIdxWithinBatch, EMPTY_SLOT);
+      }
+      retryAfterOOM( addedBatch );
+    }
 
     if (EXTRA_DEBUG) {
       logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
     }
 
-    insertEntry(incomingRowIdx, currentIdx, hashCode, lastEntryBatch, lastEntryIdxWithinBatch);
-
     // if there was no hash chain at this bucket, need to update the start index array
     if (startIdx == EMPTY_SLOT) {
-      startIndices.getMutator().setSafe(getBucketIndex(hashCode, numBuckets()), currentIdx);
+      startIndices.getMutator().set(getBucketIndex(hashCode, numBuckets()), currentIdx);
     }
     htIdxHolder.value = currentIdx;
     return  addedBatch ? PutStatus.NEW_BATCH_ADDED :
@@ -628,21 +664,6 @@ public abstract class HashTableTemplate implements HashTable {
         PutStatus.KEY_ADDED;     // otherwise
   }
 
-  private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) throws SchemaChangeException {
-
-    BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
-
-    bh.insertEntry(incomingRowIdx, currentIdx, hashValue, lastEntryBatch, lastEntryIdx);
-    numEntries++;
-
-      /* Resize hash table if needed and transfer the metadata
-       * Resize only after inserting the current entry into the hash table
-       * Otherwise our calculated lastEntryBatch and lastEntryIdx
-       * becomes invalid after resize.
-       */
-    resizeAndRehashIfNeeded();
-  }
-
   // Return -1 if key is not found in the hash table. Otherwise, return the global index of the key
   @Override
   public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException {
@@ -690,8 +711,6 @@ public abstract class HashTableTemplate implements HashTable {
       return;
     }
 
-    long t0 = System.currentTimeMillis();
-
     if (EXTRA_DEBUG) {
       logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold);
     }
@@ -704,13 +723,23 @@ public abstract class HashTableTemplate implements HashTable {
       return;
     }
 
-    int newSize = 2 * tableSize;
+    int newTableSize = 2 * tableSize;
+    newTableSize = roundUpToPowerOf2(newTableSize);
 
-    tableSize = roundUpToPowerOf2(newSize);
+    // if not enough memory available to allocate the new hash-table, plus the new links and
+    // the new hash-values (to replace the existing ones - inside rehash() ), then OOM
+    if ( 4 /* sizeof(int) */ * ( newTableSize + 2 * HashTable.BATCH_SIZE /* links + hashValues */)
+        >= allocator.getLimit() - allocator.getAllocatedMemory()) {
+      throw new OutOfMemoryException("Resize Hash Table");
+    }
+
+    tableSize = newTableSize;
     if (tableSize > MAXIMUM_CAPACITY) {
       tableSize = MAXIMUM_CAPACITY;
     }
 
+    long t0 = System.currentTimeMillis();
+
     // set the new threshold based on the new table size and load factor
     threshold = (int) Math.ceil(tableSize * htConfig.getLoadFactor());
 
@@ -744,11 +773,7 @@ public abstract class HashTableTemplate implements HashTable {
    *
    */
   public void reset() {
-    // long before = allocator.getAllocatedMemory();
     this.clear(); // Clear all current batch holders and hash table (i.e. free their memory)
-    // long after = allocator.getAllocatedMemory();
-
-    // logger.debug("Reinit Hash Table: Memory before {} After {}  Percent after: {}",before,after, (100 * after ) / before);
 
     freeIndex = 0; // all batch holders are gone
     // reallocate batch holders, and the hash table to the original size
@@ -759,7 +784,7 @@ public abstract class HashTableTemplate implements HashTable {
     incomingBuild = newIncoming;
     reset();
     try {
-      updateBatches();  // Needed ? (to update the new incoming?)
+      updateBatches();  // Needed to update the value vectors in the generated code with the new incoming
     } catch (SchemaChangeException e) {
       throw new IllegalStateException("Unexpected schema change", e);
     } catch(IndexOutOfBoundsException ioob) {
@@ -777,7 +802,7 @@ public abstract class HashTableTemplate implements HashTable {
     IntVector vector = (IntVector) TypeHelper.getNewVector(dummyIntField, allocator);
     vector.allocateNew(size);
     for (int i = 0; i < size; i++) {
-      vector.getMutator().setSafe(i, initialValue);
+      vector.getMutator().set(i, initialValue);
     }
     vector.getMutator().setValueCount(size);
     return vector;

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 8c899aa..481bea8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.RetryAfterSpillException;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.NamedExpression;
@@ -358,8 +359,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
         // For every record in the build batch , hash the key columns
         for (int i = 0; i < currentRecordCount; i++) {
           int hashCode = hashTable.getHashCode(i);
-          hashTable.put(i, htIndex, hashCode);
-
+          try {
+            hashTable.put(i, htIndex, hashCode);
+          } catch (RetryAfterSpillException RE) { throw new OutOfMemoryException("HT put");} // Hash Join can not retry yet
                         /* Use the global index returned by the hash table, to store
                          * the current record index and batch index. This will be used
                          * later when we probe and find a match.

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
index f76757f..3e021ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
@@ -438,7 +438,7 @@ public class RecordBatchSizer {
   public boolean hasSv2() { return hasSv2; }
   public int avgDensity() { return avgDensity; }
   public int netSize() { return netBatchSize; }
-  public int maxSize() { return maxSize; }
+  public int maxAvgColumnSize() { return maxSize / rowCount; }
 
   public static final int MAX_VECTOR_SIZE = 16 * 1024 * 1024; // 16 MiB
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
index a269954..9a6420a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
@@ -36,7 +36,6 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.Sort;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.hadoop.conf.Configuration;
@@ -317,7 +316,8 @@ public class SpillSet {
 
     @Override
     public void deleteDir(String fragmentSpillDir) throws IOException {
-      new File(baseDir, fragmentSpillDir).delete();
+      boolean deleted = new File(baseDir, fragmentSpillDir).delete();
+      if ( ! deleted ) { throw new IOException("Failed to delete: " + fragmentSpillDir);}
     }
 
     @Override
@@ -359,17 +359,10 @@ public class SpillSet {
   private long writeBytes;
 
   public SpillSet(FragmentContext context, PhysicalOperator popConfig) {
-    this(context.getConfig(), context.getHandle(), popConfig,
-         // Endpoint appears to be null in some tests.
-         context.getDrillbitContext() == null ? null :
-         context.getDrillbitContext().getEndpoint());
+    this(context.getConfig(), context.getHandle(), popConfig);
   }
 
-  public SpillSet(FragmentContext context, PhysicalOperator popConfig, DrillbitEndpoint ep) {
-    this(context.getConfig(), context.getHandle(), popConfig, ep);
-  }
-
-  public SpillSet(DrillConfig config, FragmentHandle handle, PhysicalOperator popConfig, DrillbitEndpoint ep) {
+  public SpillSet(DrillConfig config, FragmentHandle handle, PhysicalOperator popConfig) {
     String operName;
 
     // Set the spill options from the configuration
@@ -421,15 +414,7 @@ public class SpillSet {
       fileManager = new HadoopFileManager(spillFs);
     }
 
-    // If provided with a prefix to identify the Drillbit, prepend that to the
-    // spill directory.
-
-    String nodeDir = "";
-    if (ep != null  &&  ep.getAddress() != null) {
-      nodeDir = ep.getAddress() + "-" + ep.getUserPort() + "_";
-    }
-    spillDirName = String.format("%s%s_%s_%s-%s-%s",
-        nodeDir,
+    spillDirName = String.format("%s_%s_%s-%s-%s",
         QueryIdHelper.getQueryId(handle.getQueryId()),
         operName, handle.getMajorFragmentId(), popConfig.getOperatorId(), handle.getMinorFragmentId());
   }
@@ -491,6 +476,7 @@ public class SpillSet {
           // since this is meant to be used in a batches's cleanup, we don't propagate the exception
           logger.warn("Unable to delete spill directory " + path,  e);
       }
+      currSpillDirs.clear(); // in case close() is called again
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index ea1d605..6a97c29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -217,8 +217,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     this.incoming = incoming;
 
     SortConfig sortConfig = new SortConfig(context.getConfig());
-    SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(),
-                                     popConfig, context.getIdentity());
+    SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig);
     OperExecContext opContext = new OperExecContextImpl(context, oContext, popConfig, injector);
     PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
     SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 75bcc1f..d1d56a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -118,6 +118,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR),
       new OptionDefinition(ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR),
       new OptionDefinition(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR), // for tuning
+      new OptionDefinition(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR), // for testing
       new OptionDefinition(ExecConstants.HASHAGG_FALLBACK_ENABLED_VALIDATOR), // for enable/disable unbounded HashAgg
       new OptionDefinition(ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION),
       new OptionDefinition(ExecConstants.OUTPUT_FORMAT_VALIDATOR),

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4e7bbdb..0b8d605 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -229,22 +229,14 @@ drill.exec: {
     directories: [ "/tmp/drill/spill" ]
   },
   hashagg: {
-    // An internal tuning; should not be changed
-    min_batches_per_partition: 3,
-    // An option for testing - force a memory limit
-    mem_limit: 0,
-    // The max number of partitions in each hashagg operator
-    // This number is tuned down when memory is limited
-    // Setting it to 1 means: No spilling
-    num_partitions: 32,
     spill: {
-        // -- The 2 options below can be used to override the common ones
-        // -- (common to all spilling operators)
-        // File system to use. Local file system by default.
-        fs: ${drill.exec.spill.fs},
-        // List of directories to use. Directories are created
-        // if they do not exist.
-        directories:  ${drill.exec.spill.directories},
+      // -- The 2 options below can be used to override the common ones
+      // -- (common to all spilling operators)
+      // File system to use. Local file system by default.
+      fs: ${drill.exec.spill.fs},
+      // List of directories to use. Directories are created
+      // if they do not exist.
+      directories:  ${drill.exec.spill.directories},
     }
   },
   sort: {
@@ -370,7 +362,6 @@ drill.exec.options:  {
     debug.validate_iterators : false,
     debug.validate_vectors :false,
     drill.exec.functions.cast_empty_string_to_null: false,
-    drill.exec.hashagg.min_batches_per_partition : 3,
     # Setting to control if HashAgg should fallback to older behavior of consuming
     # unbounded memory. In case of 2 phase Agg when available memory is not enough
     # to start at least 2 partitions then HashAgg fallbacks to this case. It can be
@@ -388,8 +379,10 @@ drill.exec.options:  {
     exec.enable_bulk_load_table_list: false,
     exec.enable_union_type: false,
     exec.errors.verbose: false,
-    exec.hashagg.mem_limit : 0,
-    exec.hashagg.num_partitions :32,
+    exec.hashagg.mem_limit: 0,
+    exec.hashagg.min_batches_per_partition: 2,
+    exec.hashagg.num_partitions: 32,
+    exec.hashagg.use_memory_prediction: true,
     exec.impersonation.inbound_policies: "[]",
     exec.java.compiler.exp_in_method_size: 50,
     exec.java_compiler : "DEFAULT",