You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by Ben-Zvi <gi...@git.apache.org> on 2017/09/09 02:17:24 UTC

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

GitHub user Ben-Zvi opened a pull request:

    https://github.com/apache/drill/pull/938

    DRILL-5694: Handle HashAgg OOM by spill and retry, plus perf improvement

      The main change in this PR is adding a "_second way_" to handle memory pressure for the Hash Aggregate: Basically catch OOM failures when processing a new input row (during put() into the Hash Table), cleanup internally to allow a retry (of the put()) and return a new exception "**RetryAfterSpillException**". In such a case the caller spills some partition to free more memory, and retries inserting that new row.
       In addition, to reduce the risk of OOM when either creating the "Values Batch" (to match the "Keys Batch" in the Hash Table), or when allocating the Outgoing vectors (for the Values) -- there are new "_reserves_" -- one reserve for each of the two. A "_reserve_" is a memory amount subtracted from the memory-limit, which is added back to the limit just before it is needed, so hopefully preventing an OOM. After the allocation the code tries to restore that reserve (by subtracting from the limit, if possible). We always restore the "Outgoing Reserve" first; in case the "Values Batch" reserve runs empty just before calling put(), we skip the put() (just like an OOM there) and spill to free some memory (and restore that reserve).
       The old "_first way_" is still used. That is the code that predicts the memory needs, and triggers a spill if not enough memory is available. The spill code was separated into a new method called spillIfNeeded() which is used in two modes - either the old way (prediction), or (when called from the new OOM catch code) with a flag to force a spill, regardless of available memory. That flag is also used to reduce the priority of the "current partition" when choosing a partition to spill.
    
      A new testing option was added (**hashagg_use_memory_prediction**, default true) - by setting this to false the old "first way" is disabled. This allows stress testing of the OOM handling code (which may not be used under normal memory allocation).
    
      The HashTable put() code was re-written to cleanup partial changes in case of an OOM. And so the code around the call of put() to catch the new exception, spill and retry. Note that this works for 1st phase aggregation as well (return rows early).
    
    For the estimates (in addition to the old "max batch size" estimate) - there is an estimate for the Values Batch, and one for for the Outgoing. These are used for restoring the "reserves". These estimates may be resized up in case actual allocations are bigger.
    
    Other changes:
    * Improved the "max batch size estimation" -- using the outgoing batch for getting the correct schema (instead of the input batch).
      The only information needed from the input batch is the "max average column size" (see change inRecordBatchSizer.java) to have a better estimate for VarChars.
      Also computed the size of those "no null" bigint columns added into the Values Batch when the aggregation is SUM, MIN or MAX (see changes in HashAggBatch.java and HashAggregator.java)
    * Using a "plain Java" subclass for the HashTable  because "byte manipulation" breaks on the new template code (see ChainedHashTable.java)
    * The three Configuration options where changed into System/Session options:   min_batches_per_partition , hashagg_max_memory , hashagg_num_partitions
    * There was a potential memory leak in the HashTable BatchHolder ctor (vectors were added to the container only after the successful allocation, and the container was cleared in case of OOM. So in case of a partial allocation, the allocated part was no accessible). Also (Paul's suggestion) modified some vector templates to cleanup after any runtime error (including an OOM).
    * Performance improvements: Eliminated the call to updateBatches() before each hash computation (instead used only when switching to a new SpilledRecordBatch); this was a big overhead.
       Also changed all the "setSafe" calls into "set" for the HashTable (those nanoseconds add up, specially when rehashing) - these bigint vectors need no resizing.
    * Ignore "(spill) file not found" error while cleaning up.
    * The unit tests were re-written in a more compact form. And a test with the new option (forcing the OOM code) was added (no memory prediction).


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Ben-Zvi/drill DRILL-5694

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/938.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #938
    
----
commit 1a96bb39faf01b7665bd669d88494789693ed9b8
Author: Ben-Zvi <bb...@mapr.com>
Date:   2017-09-08T22:52:57Z

    DRILL-5694: Handle OOM in HashAggr by spill and retry, plus performance improvement

----


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939184
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -109,14 +107,21 @@
     
       private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
       private boolean is2ndPhase = false;
    -  private boolean canSpill = true; // make it false in case can not spill
    +  private boolean is1stPhase = false;
    +  private boolean canSpill = true; // make it false in case can not spill/return-early
       private ChainedHashTable baseHashTable;
       private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory
       private int earlyPartition = 0; // which partition to return early
    -
    -  private long memoryLimit; // max memory to be used by this oerator
    -  private long estMaxBatchSize = 0; // used for adjusting #partitions
    -  private long estRowWidth = 0;
    +  private boolean retrySameIndex = false; // in case put failed during 1st phase - need to output early, then retry
    --- End diff --
    
    As it turns out, unlike C++, Java is pretty good at initializing booleans to false and longs to 0. We only need to explicitly initialize values when the value should be other than 0/false/null.


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138236250
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---
    @@ -92,18 +92,20 @@
     
       // Hash Aggregate Options
     
    -  String HASHAGG_NUM_PARTITIONS = "drill.exec.hashagg.num_partitions";
       String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
       LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no spilling
    -  String HASHAGG_MAX_MEMORY = "drill.exec.hashagg.mem_limit";
       String HASHAGG_MAX_MEMORY_KEY = "exec.hashagg.mem_limit";
       LongValidator HASHAGG_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHAGG_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE);
       // min batches is used for tuning (each partition needs so many batches when planning the number of partitions,
       // or reserve this number when calculating whether the remaining available memory is too small and requires a spill.)
       // Low value may OOM (e.g., when incoming rows become wider), higher values use fewer partitions but are safer
    -  String HASHAGG_MIN_BATCHES_PER_PARTITION = "drill.exec.hashagg.min_batches_per_partition";
    -  String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "drill.exec.hashagg.min_batches_per_partition";
    -  LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5);
    +  String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "exec.hashagg.min_batches_per_partition";
    +  LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 1, 5);
    +  // Can be turns off mainly for testing. Memory prediction is used to decide on when to spill to disk; with this option off,
    --- End diff --
    
    Done


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r140062742
  
    --- Diff: common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
    @@ -536,6 +542,33 @@ public Builder pushContext(final String name, final double value) {
          * @return user exception
          */
         public UserException build(final Logger logger) {
    +
    +      // To allow for debugging:
    +      //     A spinner code to make the execution stop here while the file '/tmp/drillspin' exists
    +      // Can be used to attach a debugger, use jstack, etc
    +      // The processID of the spinning thread should be in a file like /tmp/spin4148663301172491613.tmp
    +      // along with the error message.
    +      File spinFile = new File("/tmp/drillspin");
    --- End diff --
    
     Using a "flag file" instead of a config setting gives more flexibility; like no need to restart in order to turn this feature on/off, or can select to catch errors only in few nodes, and last -- can free the looping thread by deleting this "flag file". 
          I also plan on posting an announcement on the dev list about this new "feature", and see if there's any feedback. 



---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138494777
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -1178,20 +1273,38 @@ private void checkGroupAndAggrValues(int incomingRowIdx) {
         hashCode >>>= bitsInMask;
         HashTable.PutStatus putStatus = null;
         long allocatedBeforeHTput = allocator.getAllocatedMemory();
    -
         // ==========================================
         // Insert the key columns into the hash table
         // ==========================================
    +    boolean noReserveMem = reserveValueBatchMemory == 0;
         try {
    +      if ( noReserveMem && canSpill ) { throw new RetryAfterSpillException();} // proactive spill, skip put()
    +
           putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
    +
    +    } catch (RetryAfterSpillException re) {
    --- End diff --
    
    Done. 


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138236176
  
    --- Diff: common/src/main/java/org/apache/drill/common/exceptions/RetryAfterSpillException.java ---
    @@ -0,0 +1,32 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.common.exceptions;
    +
    +import org.apache.drill.exec.proto.UserBitShared;
    +
    +/**
    + *  A special exception to be caught by caller, who is supposed to free memory by spilling and try again
    + *
    + */
    +public class RetryAfterSpillException extends UserException {
    --- End diff --
    
    Done


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939427
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -1178,20 +1273,38 @@ private void checkGroupAndAggrValues(int incomingRowIdx) {
         hashCode >>>= bitsInMask;
         HashTable.PutStatus putStatus = null;
         long allocatedBeforeHTput = allocator.getAllocatedMemory();
    -
         // ==========================================
         // Insert the key columns into the hash table
         // ==========================================
    +    boolean noReserveMem = reserveValueBatchMemory == 0;
         try {
    +      if ( noReserveMem && canSpill ) { throw new RetryAfterSpillException();} // proactive spill, skip put()
    +
           putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
    +
    +    } catch (RetryAfterSpillException re) {
    --- End diff --
    
    See above. Should be a checked exception declared by `put()`.
    
    Also, why do we need to throw an exception before calling `put` only to catch it a couple of lines later?
    
    If the spill code was in a method, rather than just inline, seems we could do:
    
    ```
    if ( noReserveMem && canSpill ) { doSpill(); }
    try {
      ...put(...)
    } catch (...) {
       doSpill();
    }
    ```


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939253
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -382,19 +390,25 @@ private void delayedSetup() {
         final boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
     
         // Set the number of partitions from the configuration (raise to a power of two, if needed)
    -    numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS);
    -    if ( numPartitions == 1 ) {
    +    numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
    +    if ( numPartitions == 1 && is2ndPhase  ) { // 1st phase can still do early return with 1 partition
           canSpill = false;
           logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
         }
         numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
     
    -    if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch
    +    if ( schema == null ) { estValuesBatchSize = estOutgoingAllocSize = estMaxBatchSize = 0; } // incoming was an empty batch
         else {
           // Estimate the max batch size; should use actual data (e.g. lengths of varchars)
           updateEstMaxBatchSize(incoming);
         }
    -    long memAvail = memoryLimit - allocator.getAllocatedMemory();
    +    // create "reserved memory" and adjust the memory limit down
    +    reserveValueBatchMemory = reserveOutgoingMemory = estValuesBatchSize ;
    +    long newMemoryLimit = allocator.getLimit() - reserveValueBatchMemory - reserveOutgoingMemory ;
    +    long memAvail = newMemoryLimit - allocator.getAllocatedMemory();
    +    if ( memAvail <= 0 ) { throw new OutOfMemoryException("Too little memory available"); }
    +    allocator.setLimit(newMemoryLimit);
    +
    --- End diff --
    
    This code has grown to be incredibly complex with many, many paths through the various functions.
    
    Tests are handy things. Do we have system-level unit tests that exercise each path through the code? Otherwise, as a reviewer, how can I be sure that each execution path does, in fact, work?


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939361
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -1335,7 +1470,7 @@ private void updateStats(HashTable[] htables) {
         }
         if ( rowsReturnedEarly > 0 ) {
           stats.setLongStat(Metric.SPILL_MB, // update stats - est. total MB returned early
    -          (int) Math.round( rowsReturnedEarly * estRowWidth / 1024.0D / 1024.0));
    +          (int) Math.round( rowsReturnedEarly * estOutputRowWidth / 1024.0D / 1024.0));
    --- End diff --
    
    This file is a template. This means, we copy *all* this code each time we generate a new class. How is doing so helping stability, customer value or performance? Should all this code be in a template that is copied on every query? Or, should it be refactored into a driver class, with only a very light wrapper appearing in the copied template?
    
    As this code get ever more complex, it puts a strain on the Java code that must walk though this code and do method fixup, scalar replacements, etc. That work takes time. What value accrues to the user from doing this fixup on code that never changes  from one query to the next?
    
    Filed [DRILL-5779](https://issues.apache.org/jira/browse/DRILL-5779) for this issue.


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939459
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -1178,20 +1273,38 @@ private void checkGroupAndAggrValues(int incomingRowIdx) {
         hashCode >>>= bitsInMask;
         HashTable.PutStatus putStatus = null;
         long allocatedBeforeHTput = allocator.getAllocatedMemory();
    -
         // ==========================================
         // Insert the key columns into the hash table
         // ==========================================
    +    boolean noReserveMem = reserveValueBatchMemory == 0;
         try {
    +      if ( noReserveMem && canSpill ) { throw new RetryAfterSpillException();} // proactive spill, skip put()
    +
           putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
    +
    +    } catch (RetryAfterSpillException re) {
    +      if ( ! canSpill ) { throw new OutOfMemoryException(getOOMErrorMsg("Can not spill")); }
    --- End diff --
    
    This is the message sent to the log and user. Should we explain why we can't spill? And, what to do? Something like:
    
    "Incoming batch too large and no in-memory partitions to spill. Increase memory assigned to the Hash Agg."
    
    Replace the above wording with the actual reasons and fixes.


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939481
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java ---
    @@ -47,10 +47,7 @@
       // OK - batch returned, NONE - end of data, RESTART - call again
       public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
     
    -  public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
    -                             OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
    -                             LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds,
    -                             VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException;
    +  public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
    --- End diff --
    
    Not sure that putting all items on one big line is an improvement over the arg-per-line format previously.
    
    Also, see note above: a large number of arguments suggest a muddy design with one class trying to do far too much.


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r139045329
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -545,16 +584,19 @@ public AggOutcome doWork() {
           if (EXTRA_DEBUG_1) {
             logger.debug("Starting outer loop of doWork()...");
           }
    -      for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
    +      while (underlyingIndex < currentBatchRecordCount) {
             if (EXTRA_DEBUG_2) {
               logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
             }
             checkGroupAndAggrValues(currentIndex);
    +
    +        if ( retrySameIndex ) { retrySameIndex = false; }  // need to retry this row (e.g. we had an OOM)
    --- End diff --
    
    So why does "or before" have spaces ? :-)  


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r139877726
  
    --- Diff: common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
    @@ -536,6 +542,33 @@ public Builder pushContext(final String name, final double value) {
          * @return user exception
          */
         public UserException build(final Logger logger) {
    +
    +      // To allow for debugging:
    +      //     A spinner code to make the execution stop here while the file '/tmp/drillspin' exists
    +      // Can be used to attach a debugger, use jstack, etc
    +      // The processID of the spinning thread should be in a file like /tmp/spin4148663301172491613.tmp
    +      // along with the error message.
    +      File spinFile = new File("/tmp/drillspin");
    --- End diff --
    
    Should this be a config setting? Probably the config is not visible here, but can we set a static variable at start-up time? And, since this code will check the file system on every exception, should we have a config variable to turn on the check?
    
    Feel free to tell me I'm being overly paranoid...


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138495296
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java ---
    @@ -58,7 +59,7 @@
     
       public int getHashCode(int incomingRowIdx) throws SchemaChangeException;
     
    -  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException;
    +  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
    --- End diff --
    
    Done.


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r140062933
  
    --- Diff: common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
    @@ -536,6 +542,33 @@ public Builder pushContext(final String name, final double value) {
          * @return user exception
          */
         public UserException build(final Logger logger) {
    +
    +      // To allow for debugging:
    +      //     A spinner code to make the execution stop here while the file '/tmp/drillspin' exists
    +      // Can be used to attach a debugger, use jstack, etc
    +      // The processID of the spinning thread should be in a file like /tmp/spin4148663301172491613.tmp
    +      // along with the error message.
    +      File spinFile = new File("/tmp/drillspin");
    +      if ( spinFile.exists() ) {
    +        File tmpDir = new File("/tmp");
    +        File outErr = null;
    +        try {
    +          outErr = File.createTempFile("spin", ".tmp", tmpDir);
    +          BufferedWriter bw = new BufferedWriter(new FileWriter(outErr));
    +          bw.write("Spinning process: " + ManagementFactory.getRuntimeMXBean().getName()
    +          /* After upgrading to JDK 9 - replace with: ProcessHandle.current().getPid() */);
    +          bw.write("\nError cause: " +
    +            (errorType == DrillPBError.ErrorType.SYSTEM ? ("SYSTEM ERROR: " + ErrorHelper.getRootMessage(cause)) : message));
    +          bw.close();
    +        } catch (Exception ex) {
    +          logger.warn("Failed creating a spinner tmp message file: {}", ex);
    +        }
    +        while (spinFile.exists()) {
    +          try { sleep(1_000); } catch (Exception ex) { /* ignore interruptions */ }
    --- End diff --
    
     Does query killing cause a user exception ?



---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi closed the pull request at:

    https://github.com/apache/drill/pull/938


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939296
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -545,16 +584,19 @@ public AggOutcome doWork() {
           if (EXTRA_DEBUG_1) {
             logger.debug("Starting outer loop of doWork()...");
           }
    -      for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
    +      while (underlyingIndex < currentBatchRecordCount) {
             if (EXTRA_DEBUG_2) {
               logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
             }
             checkGroupAndAggrValues(currentIndex);
    +
    +        if ( retrySameIndex ) { retrySameIndex = false; }  // need to retry this row (e.g. we had an OOM)
    --- End diff --
    
    I think Drill's coding style guidelines says no spaces after ( or before ).


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939203
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -109,14 +107,21 @@
     
       private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
       private boolean is2ndPhase = false;
    -  private boolean canSpill = true; // make it false in case can not spill
    +  private boolean is1stPhase = false;
    +  private boolean canSpill = true; // make it false in case can not spill/return-early
       private ChainedHashTable baseHashTable;
       private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory
       private int earlyPartition = 0; // which partition to return early
    -
    -  private long memoryLimit; // max memory to be used by this oerator
    -  private long estMaxBatchSize = 0; // used for adjusting #partitions
    -  private long estRowWidth = 0;
    +  private boolean retrySameIndex = false; // in case put failed during 1st phase - need to output early, then retry
    +  private boolean useMemoryPrediction = false; // whether to use memory prediction to decide when to spill
    +  private long estMaxBatchSize = 0; // used for adjusting #partitions and deciding when to spill
    +  private long estRowWidth = 0; // the size of the internal "row" (keys + values + extra columns)
    +  private long estValuesRowWidth = 0; // the size of the internal values ( values + extra )
    +  private long estOutputRowWidth = 0; // the size of the output "row" (no extra columns)
    +  private long estValuesBatchSize = 0; // used for "reserving" memory for the Values batch to overcome an OOM
    +  private long estOutgoingAllocSize = 0; // used for "reserving" memory for the Outgoing Output Values to overcome an OOM
    +  private long reserveValueBatchMemory; // keep "reserve memory" for Values Batch
    +  private long reserveOutgoingMemory; // keep "reserve memory" for the Outgoing (Values only) output
    --- End diff --
    
    Long lists of member variables are generally frowned upon. Can't unit test them. Too many states to keep in mind. Can these be grouped into a read-only config class (set up front, then never changed) vs, running estimates?


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939496
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java ---
    @@ -58,7 +59,7 @@
     
       public int getHashCode(int incomingRowIdx) throws SchemaChangeException;
     
    -  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException;
    +  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
    --- End diff --
    
    At present, `RetryAfterSpillException` is unchecked, so it is not necessary to declare. But, change `RetryAfterSpillException` to extend `Exception` (checked) and this declaration then becomes useful.


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939336
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -646,6 +687,46 @@ public AggOutcome doWork() {
       }
     
       /**
    +   *   Use reserved values memory (if available) to try and preemp an OOM
    +   */
    +  private void useReservedValuesMemory() {
    +    // try to preempt an OOM by using the reserved memory
    +    long reservedMemory = reserveValueBatchMemory;
    +    if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
    +
    +    reserveValueBatchMemory = 0;
    +  }
    +  /**
    +   *   Use reserved outgoing output memory (if available) to try and preemp an OOM
    +   */
    +  private void useReservedOutgoingMemory() {
    +    // try to preempt an OOM by using the reserved memory
    +    long reservedMemory = reserveOutgoingMemory;
    +    if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
    +
    +    reserveOutgoingMemory = 0;
    +  }
    +  /**
    +   *  Restore the reserve memory (both)
    +   *
    +   */
    +  private void restoreReservedMemory() {
    +    if ( 0 == reserveOutgoingMemory ) { // always restore OutputValues first (needed for spilling)
    +      long memAvail = allocator.getLimit() - allocator.getAllocatedMemory();
    +      if ( memAvail > estOutgoingAllocSize) {
    +        allocator.setLimit(allocator.getLimit() - estOutgoingAllocSize);
    --- End diff --
    
    Done this way, it is necessary for this reviewer to mentally track the current allocator limit. Can we ever subtract an amount twice? Add it twice?
    
    Now, it seems we should not alter the allocator. But, if we do, shouldn't it be based on absolute amounts, not relative deltas?


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138492663
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -646,6 +687,46 @@ public AggOutcome doWork() {
       }
     
       /**
    +   *   Use reserved values memory (if available) to try and preemp an OOM
    +   */
    +  private void useReservedValuesMemory() {
    +    // try to preempt an OOM by using the reserved memory
    +    long reservedMemory = reserveValueBatchMemory;
    +    if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
    +
    +    reserveValueBatchMemory = 0;
    +  }
    +  /**
    +   *   Use reserved outgoing output memory (if available) to try and preemp an OOM
    +   */
    +  private void useReservedOutgoingMemory() {
    +    // try to preempt an OOM by using the reserved memory
    +    long reservedMemory = reserveOutgoingMemory;
    +    if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
    +
    +    reserveOutgoingMemory = 0;
    +  }
    +  /**
    +   *  Restore the reserve memory (both)
    +   *
    +   */
    +  private void restoreReservedMemory() {
    +    if ( 0 == reserveOutgoingMemory ) { // always restore OutputValues first (needed for spilling)
    +      long memAvail = allocator.getLimit() - allocator.getAllocatedMemory();
    +      if ( memAvail > estOutgoingAllocSize) {
    +        allocator.setLimit(allocator.getLimit() - estOutgoingAllocSize);
    --- End diff --
    
    Can never add twice, as the code only adds to an empty ( == 0 ) reserve.
    And these are not "relative deltas", but the actual expected batch size (so that the following allocation would not OOM).



---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939122
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---
    @@ -92,18 +92,20 @@
     
       // Hash Aggregate Options
     
    -  String HASHAGG_NUM_PARTITIONS = "drill.exec.hashagg.num_partitions";
       String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
       LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no spilling
    -  String HASHAGG_MAX_MEMORY = "drill.exec.hashagg.mem_limit";
       String HASHAGG_MAX_MEMORY_KEY = "exec.hashagg.mem_limit";
       LongValidator HASHAGG_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHAGG_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE);
       // min batches is used for tuning (each partition needs so many batches when planning the number of partitions,
       // or reserve this number when calculating whether the remaining available memory is too small and requires a spill.)
       // Low value may OOM (e.g., when incoming rows become wider), higher values use fewer partitions but are safer
    -  String HASHAGG_MIN_BATCHES_PER_PARTITION = "drill.exec.hashagg.min_batches_per_partition";
    -  String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "drill.exec.hashagg.min_batches_per_partition";
    -  LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5);
    +  String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "exec.hashagg.min_batches_per_partition";
    +  LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 1, 5);
    +  // Can be turns off mainly for testing. Memory prediction is used to decide on when to spill to disk; with this option off,
    --- End diff --
    
    turns --> turned


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r139877079
  
    --- Diff: common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
    @@ -536,6 +542,33 @@ public Builder pushContext(final String name, final double value) {
          * @return user exception
          */
         public UserException build(final Logger logger) {
    +
    +      // To allow for debugging:
    +      //     A spinner code to make the execution stop here while the file '/tmp/drillspin' exists
    +      // Can be used to attach a debugger, use jstack, etc
    +      // The processID of the spinning thread should be in a file like /tmp/spin4148663301172491613.tmp
    --- End diff --
    
    Would also recommend `/tmp/drill/spin...`.


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r140098546
  
    --- Diff: common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
    @@ -536,6 +542,33 @@ public Builder pushContext(final String name, final double value) {
          * @return user exception
          */
         public UserException build(final Logger logger) {
    +
    +      // To allow for debugging:
    +      //     A spinner code to make the execution stop here while the file '/tmp/drillspin' exists
    +      // Can be used to attach a debugger, use jstack, etc
    +      // The processID of the spinning thread should be in a file like /tmp/spin4148663301172491613.tmp
    --- End diff --
    
    Done 


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939223
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -297,10 +302,7 @@ public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowId
       }
     
       @Override
    -  public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
    -                    OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
    -                    LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds,
    -                    VectorContainer outContainer) throws SchemaChangeException, IOException {
    +  public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
    --- End diff --
    
    Methods and constructors with many arguments are generally frowned upon as it suggests that a single class is trying to do too much: it is has too much internal coupling, performing tasks that should be broken apart. Can this single, huge, class be split into smaller, more focused, abstractions?


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939532
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---
    @@ -158,19 +158,17 @@ public BatchHolder(int idx) {
           } finally {
             if (!success) {
               htContainer.clear();
    -          if (links != null) {
    -            links.clear();
    -          }
    +          if (links != null) { links.clear();}
             }
           }
         }
     
         private void init(IntVector links, IntVector hashValues, int size) {
           for (int i = 0; i < size; i++) {
    -        links.getMutator().setSafe(i, EMPTY_SLOT);
    +        links.getMutator().set(i, EMPTY_SLOT);
    --- End diff --
    
    Is size ever less than the vector capacity()? Else, you can just ask the vector for its capacity.
    
    The `links.getMutator()` call in an inner loop is inefficient.
    
    Instead of a single function initializing two `IntVector`s with redundant code, can this be refactored to have a function that initializes one vector, that is called twice?


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r139045744
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -1335,7 +1470,7 @@ private void updateStats(HashTable[] htables) {
         }
         if ( rowsReturnedEarly > 0 ) {
           stats.setLongStat(Metric.SPILL_MB, // update stats - est. total MB returned early
    -          (int) Math.round( rowsReturnedEarly * estRowWidth / 1024.0D / 1024.0));
    +          (int) Math.round( rowsReturnedEarly * estOutputRowWidth / 1024.0D / 1024.0));
    --- End diff --
    
    Work will be done later as part of DRILL-5779 


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138236560
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---
    @@ -293,7 +299,7 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException,
             aggrExprs,
             cgInner.getWorkspaceTypes(),
             groupByOutFieldIds,
    -        this.container);
    +        this.container, extraNonNullColumns * 8 /* sizeof(BigInt) */);
    --- End diff --
    
    Not sure .... seemed to work OK in some (limited) testing.


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r139045903
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java ---
    @@ -47,10 +47,7 @@
       // OK - batch returned, NONE - end of data, RESTART - call again
       public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
     
    -  public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
    -                             OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
    -                             LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds,
    -                             VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException;
    +  public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
    --- End diff --
    
    That was one of the IDE's ideas ....
    And simplification could be done as part of future cleanup work (like DRILL-5779)


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138433967
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -500,22 +516,45 @@ private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeExcepti
        */
       private void updateEstMaxBatchSize(RecordBatch incoming) {
         if ( estMaxBatchSize > 0 ) { return; }  // no handling of a schema (or varchar) change
    +    // Use the sizer to get the input row width and the length of the longest varchar column
         RecordBatchSizer sizer = new RecordBatchSizer(incoming);
         logger.trace("Incoming sizer: {}",sizer);
         // An empty batch only has the schema, can not tell actual length of varchars
         // else use the actual varchars length, each capped at 50 (to match the space allocation)
    -    estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
    -    estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
    +    long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
     
         // Get approx max (varchar) column width to get better memory allocation
    -    maxColumnWidth = Math.max(sizer.maxSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
    +    maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
         maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE);
     
    -    logger.trace("{} phase. Estimated row width: {}  batch size: {}  memory limit: {}  max column width: {}",
    -        isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
    +    //
    +    // Calculate the estimated max (internal) batch (i.e. Keys batch + Values batch) size
    +    // (which is used to decide when to spill)
    +    // Also calculate the values batch size (used as a reserve to overcome an OOM)
    +    //
    +    Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
    +    int fieldId = 0;
    +    while (outgoingIter.hasNext()) {
    +      ValueVector vv = outgoingIter.next().getValueVector();
    +      MaterializedField mr = vv.getField();
    +      int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth :
    +          TypeHelper.getSize(mr.getType());
    +      estRowWidth += fieldSize;
    +      estOutputRowWidth += fieldSize;
    +      if ( fieldId < numGroupByOutFields ) { fieldId++; }
    +      else { estValuesRowWidth += fieldSize; }
    +    }
    +    // multiply by the max number of rows in a batch to get the final estimated max size
    +    estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE;
    --- End diff --
    
    Most of these estimates are for internal "worst case".  Only the "outgoing" one is for the outgoing batch (which is also for spilling - which is internal).
       Anyway all these estimates have nothing to do with _throttling_ the outgoing batch size; that logic was not changed from the original code (likely up to MAX_BATCH_SIZE). 
      Making such a change should be a separate project. 



---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138435187
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -382,19 +390,25 @@ private void delayedSetup() {
         final boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
     
         // Set the number of partitions from the configuration (raise to a power of two, if needed)
    -    numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS);
    -    if ( numPartitions == 1 ) {
    +    numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
    +    if ( numPartitions == 1 && is2ndPhase  ) { // 1st phase can still do early return with 1 partition
           canSpill = false;
           logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
         }
         numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
     
    -    if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch
    +    if ( schema == null ) { estValuesBatchSize = estOutgoingAllocSize = estMaxBatchSize = 0; } // incoming was an empty batch
         else {
           // Estimate the max batch size; should use actual data (e.g. lengths of varchars)
           updateEstMaxBatchSize(incoming);
         }
    -    long memAvail = memoryLimit - allocator.getAllocatedMemory();
    +    // create "reserved memory" and adjust the memory limit down
    +    reserveValueBatchMemory = reserveOutgoingMemory = estValuesBatchSize ;
    +    long newMemoryLimit = allocator.getLimit() - reserveValueBatchMemory - reserveOutgoingMemory ;
    +    long memAvail = newMemoryLimit - allocator.getAllocatedMemory();
    +    if ( memAvail <= 0 ) { throw new OutOfMemoryException("Too little memory available"); }
    +    allocator.setLimit(newMemoryLimit);
    +
    --- End diff --
    
    Yes indeed. The only attempt to force a code path is the new testing option *use_memory_prediction* which can disable the estimate based prediction (when to spill), hence forcing the code path that relies on an OOM (for hash table put() ) to take place (one unit test was added for that).
        Getting a full code coverage would be ideal, but hard.


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939168
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---
    @@ -293,7 +299,7 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException,
             aggrExprs,
             cgInner.getWorkspaceTypes(),
             groupByOutFieldIds,
    -        this.container);
    +        this.container, extraNonNullColumns * 8 /* sizeof(BigInt) */);
    --- End diff --
    
    If the `BigInt` column is used to indicate nulls, then each value is of size 9. And, since, on average, each vector has 25% internal fragmentation. To account for this, perhaps assume that the average size is 12 or 13 bytes.


---

[GitHub] drill issue #938: DRILL-5694: Handle HashAgg OOM by spill and retry, plus pe...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on the issue:

    https://github.com/apache/drill/pull/938
  
      These tests passed for me (embedded, on the Mac) after squashing and rebasing yesterday.
    
    Were these failures seen on the cluster ?
    
    The test that "failed" was supposed to hit a failure internally (user error - not enough memory).
    
    
    
    ________________________________
    From: Paul Rogers <no...@github.com>
    Sent: Thursday, September 21, 2017 10:03:57 PM
    To: apache/drill
    Cc: Boaz Ben-Zvi; Mention
    Subject: Re: [apache/drill] DRILL-5694: Handle HashAgg OOM by spill and retry, plus perf improvement (#938)
    
    
    @Ben-Zvi<https://github.com/ben-zvi>, unit tests failed with these errors:
    
    Failed tests:
      TestHashAggrSpill.testHashAggrFailWithFallbackDisabed:165 null
    
    Tests in error:
      TestHashAggrSpill.testNoPredictHashAggrSpill:135->testSpill:110 » IllegalState
      TestHashAggrSpill.testHashAggrSecondaryTertiarySpill:147->testSpill:110 » IllegalState
      TestHashAggrSpill.testSimpleHashAggrSpill:123->testSpill:110 » IllegalState Cl...
      TestHashAggrSpill.testHashAggrSuccessWithFallbackEnabled:179->testSpill:110 » IllegalState
    
    
    Is this expected at this stage? Are these due to the test framework issue we discussed recently?
    
    —
    You are receiving this because you were mentioned.
    Reply to this email directly, view it on GitHub<https://github.com/apache/drill/pull/938#issuecomment-331350881>, or mute the thread<https://github.com/notifications/unsubscribe-auth/AFNkR8EwAcVMU_x3rbfQNjKDs3GvNhkxks5skz-8gaJpZM4PR2ky>.



---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r139045072
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -109,14 +107,21 @@
     
       private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
       private boolean is2ndPhase = false;
    -  private boolean canSpill = true; // make it false in case can not spill
    +  private boolean is1stPhase = false;
    +  private boolean canSpill = true; // make it false in case can not spill/return-early
       private ChainedHashTable baseHashTable;
       private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory
       private int earlyPartition = 0; // which partition to return early
    -
    -  private long memoryLimit; // max memory to be used by this oerator
    -  private long estMaxBatchSize = 0; // used for adjusting #partitions
    -  private long estRowWidth = 0;
    +  private boolean retrySameIndex = false; // in case put failed during 1st phase - need to output early, then retry
    +  private boolean useMemoryPrediction = false; // whether to use memory prediction to decide when to spill
    +  private long estMaxBatchSize = 0; // used for adjusting #partitions and deciding when to spill
    +  private long estRowWidth = 0; // the size of the internal "row" (keys + values + extra columns)
    +  private long estValuesRowWidth = 0; // the size of the internal values ( values + extra )
    +  private long estOutputRowWidth = 0; // the size of the output "row" (no extra columns)
    +  private long estValuesBatchSize = 0; // used for "reserving" memory for the Values batch to overcome an OOM
    +  private long estOutgoingAllocSize = 0; // used for "reserving" memory for the Outgoing Output Values to overcome an OOM
    +  private long reserveValueBatchMemory; // keep "reserve memory" for Values Batch
    +  private long reserveOutgoingMemory; // keep "reserve memory" for the Outgoing (Values only) output
    --- End diff --
    
    Will wait for some future cleanup opportunity.


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138495914
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---
    @@ -158,19 +158,17 @@ public BatchHolder(int idx) {
           } finally {
             if (!success) {
               htContainer.clear();
    -          if (links != null) {
    -            links.clear();
    -          }
    +          if (links != null) { links.clear();}
             }
           }
         }
     
         private void init(IntVector links, IntVector hashValues, int size) {
           for (int i = 0; i < size; i++) {
    -        links.getMutator().setSafe(i, EMPTY_SLOT);
    +        links.getMutator().set(i, EMPTY_SLOT);
    --- End diff --
    
    This init() method is not used .... looks like leftover old code 


---

[GitHub] drill issue #938: DRILL-5694: Handle HashAgg OOM by spill and retry, plus pe...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/938
  
    These failures were seen on the QA cluster. Look for for build "MERGE-070921-1".
    
    The team just cleaned up a bunch of failing tests; I hesitate to introduce a commit that causes a new set to fail. Can you investigate please?


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138236706
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -109,14 +107,21 @@
     
       private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
       private boolean is2ndPhase = false;
    -  private boolean canSpill = true; // make it false in case can not spill
    +  private boolean is1stPhase = false;
    +  private boolean canSpill = true; // make it false in case can not spill/return-early
       private ChainedHashTable baseHashTable;
       private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory
       private int earlyPartition = 0; // which partition to return early
    -
    -  private long memoryLimit; // max memory to be used by this oerator
    -  private long estMaxBatchSize = 0; // used for adjusting #partitions
    -  private long estRowWidth = 0;
    +  private boolean retrySameIndex = false; // in case put failed during 1st phase - need to output early, then retry
    --- End diff --
    
    This is more for code readability -- "by default, this flag was chosen to be false".   


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939319
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -646,6 +687,46 @@ public AggOutcome doWork() {
       }
     
       /**
    +   *   Use reserved values memory (if available) to try and preemp an OOM
    +   */
    +  private void useReservedValuesMemory() {
    +    // try to preempt an OOM by using the reserved memory
    +    long reservedMemory = reserveValueBatchMemory;
    +    if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
    +
    +    reserveValueBatchMemory = 0;
    +  }
    +  /**
    +   *   Use reserved outgoing output memory (if available) to try and preemp an OOM
    +   */
    +  private void useReservedOutgoingMemory() {
    +    // try to preempt an OOM by using the reserved memory
    +    long reservedMemory = reserveOutgoingMemory;
    +    if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
    --- End diff --
    
    Why is it necessary to change the allocator limit? The allocator limit should be fixed: it is the amount of memory given to this operator. Shouldn't the code use its own, internal, limits to make decisions? That is, if allocated memory + some expected use > a defined internal size, then spill?


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939287
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -500,22 +516,45 @@ private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeExcepti
        */
       private void updateEstMaxBatchSize(RecordBatch incoming) {
         if ( estMaxBatchSize > 0 ) { return; }  // no handling of a schema (or varchar) change
    +    // Use the sizer to get the input row width and the length of the longest varchar column
         RecordBatchSizer sizer = new RecordBatchSizer(incoming);
         logger.trace("Incoming sizer: {}",sizer);
         // An empty batch only has the schema, can not tell actual length of varchars
         // else use the actual varchars length, each capped at 50 (to match the space allocation)
    -    estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
    -    estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
    +    long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
     
         // Get approx max (varchar) column width to get better memory allocation
    -    maxColumnWidth = Math.max(sizer.maxSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
    +    maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
         maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE);
     
    -    logger.trace("{} phase. Estimated row width: {}  batch size: {}  memory limit: {}  max column width: {}",
    -        isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
    +    //
    +    // Calculate the estimated max (internal) batch (i.e. Keys batch + Values batch) size
    +    // (which is used to decide when to spill)
    +    // Also calculate the values batch size (used as a reserve to overcome an OOM)
    +    //
    +    Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
    +    int fieldId = 0;
    +    while (outgoingIter.hasNext()) {
    +      ValueVector vv = outgoingIter.next().getValueVector();
    +      MaterializedField mr = vv.getField();
    +      int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth :
    +          TypeHelper.getSize(mr.getType());
    +      estRowWidth += fieldSize;
    +      estOutputRowWidth += fieldSize;
    +      if ( fieldId < numGroupByOutFields ) { fieldId++; }
    +      else { estValuesRowWidth += fieldSize; }
    +    }
    +    // multiply by the max number of rows in a batch to get the final estimated max size
    +    estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE;
    --- End diff --
    
    Here, the output batch size is fixed based on the number of rows. Suppose we had a sort as the output of this operator, and the sort has a memory ceiling of x MB. Can the code here create batches larger than x/2 MB, meaning that that sort is forced to consume batches so large that it can't buffer two and spill?
    
    In other words, is there an attempt here to control overall output batch memory use instead of just assuming that we always output `MAX_BATCH_SIZE` rows regardless of memory use?


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r139877947
  
    --- Diff: common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
    @@ -536,6 +542,33 @@ public Builder pushContext(final String name, final double value) {
          * @return user exception
          */
         public UserException build(final Logger logger) {
    +
    +      // To allow for debugging:
    +      //     A spinner code to make the execution stop here while the file '/tmp/drillspin' exists
    +      // Can be used to attach a debugger, use jstack, etc
    +      // The processID of the spinning thread should be in a file like /tmp/spin4148663301172491613.tmp
    +      // along with the error message.
    +      File spinFile = new File("/tmp/drillspin");
    +      if ( spinFile.exists() ) {
    +        File tmpDir = new File("/tmp");
    +        File outErr = null;
    +        try {
    +          outErr = File.createTempFile("spin", ".tmp", tmpDir);
    +          BufferedWriter bw = new BufferedWriter(new FileWriter(outErr));
    +          bw.write("Spinning process: " + ManagementFactory.getRuntimeMXBean().getName()
    +          /* After upgrading to JDK 9 - replace with: ProcessHandle.current().getPid() */);
    +          bw.write("\nError cause: " +
    +            (errorType == DrillPBError.ErrorType.SYSTEM ? ("SYSTEM ERROR: " + ErrorHelper.getRootMessage(cause)) : message));
    +          bw.close();
    +        } catch (Exception ex) {
    +          logger.warn("Failed creating a spinner tmp message file: {}", ex);
    +        }
    +        while (spinFile.exists()) {
    +          try { sleep(1_000); } catch (Exception ex) { /* ignore interruptions */ }
    --- End diff --
    
    What happens it the fragment executor tries to kill the query? Do we want the spinner to ignore that request here?


---

[GitHub] drill issue #938: DRILL-5694: Handle HashAgg OOM by spill and retry, plus pe...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/938
  
    @Ben-Zvi, unit tests failed with these errors:
    ```
    Failed tests: 
      TestHashAggrSpill.testHashAggrFailWithFallbackDisabed:165 null
    
    Tests in error: 
      TestHashAggrSpill.testNoPredictHashAggrSpill:135->testSpill:110 » IllegalState
      TestHashAggrSpill.testHashAggrSecondaryTertiarySpill:147->testSpill:110 » IllegalState
      TestHashAggrSpill.testSimpleHashAggrSpill:123->testSpill:110 » IllegalState Cl...
      TestHashAggrSpill.testHashAggrSuccessWithFallbackEnabled:179->testSpill:110 » IllegalState
    ```
    Is this expected at this stage? Are these due to the test framework issue we discussed recently?


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138240616
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -297,10 +302,7 @@ public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowId
       }
     
       @Override
    -  public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
    -                    OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
    -                    LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds,
    -                    VectorContainer outContainer) throws SchemaChangeException, IOException {
    +  public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
    --- End diff --
    
    Removed one argument "stats" - can be taken from the "oContext"


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r139877011
  
    --- Diff: common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
    @@ -536,6 +542,33 @@ public Builder pushContext(final String name, final double value) {
          * @return user exception
          */
         public UserException build(final Logger logger) {
    +
    +      // To allow for debugging:
    +      //     A spinner code to make the execution stop here while the file '/tmp/drillspin' exists
    --- End diff --
    
    Would recommend `/tmp/drill/spin`. We already use `/tmp/drill` for other items, so this keep things tidy.


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r140093627
  
    --- Diff: common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
    @@ -536,6 +542,33 @@ public Builder pushContext(final String name, final double value) {
          * @return user exception
          */
         public UserException build(final Logger logger) {
    +
    +      // To allow for debugging:
    +      //     A spinner code to make the execution stop here while the file '/tmp/drillspin' exists
    +      // Can be used to attach a debugger, use jstack, etc
    +      // The processID of the spinning thread should be in a file like /tmp/spin4148663301172491613.tmp
    +      // along with the error message.
    +      File spinFile = new File("/tmp/drillspin");
    +      if ( spinFile.exists() ) {
    +        File tmpDir = new File("/tmp");
    +        File outErr = null;
    +        try {
    +          outErr = File.createTempFile("spin", ".tmp", tmpDir);
    +          BufferedWriter bw = new BufferedWriter(new FileWriter(outErr));
    +          bw.write("Spinning process: " + ManagementFactory.getRuntimeMXBean().getName()
    +          /* After upgrading to JDK 9 - replace with: ProcessHandle.current().getPid() */);
    +          bw.write("\nError cause: " +
    +            (errorType == DrillPBError.ErrorType.SYSTEM ? ("SYSTEM ERROR: " + ErrorHelper.getRootMessage(cause)) : message));
    +          bw.close();
    +        } catch (Exception ex) {
    +          logger.warn("Failed creating a spinner tmp message file: {}", ex);
    +        }
    +        while (spinFile.exists()) {
    +          try { sleep(1_000); } catch (Exception ex) { /* ignore interruptions */ }
    --- End diff --
    
    Yes - if some non-blocked part tries to kill the query, the spinning parts would still be blocked - that may be by design, as debugging still goes on (until a user issues "clush -a rm /tmp/drill/spin" )



---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138495164
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -1178,20 +1273,38 @@ private void checkGroupAndAggrValues(int incomingRowIdx) {
         hashCode >>>= bitsInMask;
         HashTable.PutStatus putStatus = null;
         long allocatedBeforeHTput = allocator.getAllocatedMemory();
    -
         // ==========================================
         // Insert the key columns into the hash table
         // ==========================================
    +    boolean noReserveMem = reserveValueBatchMemory == 0;
         try {
    +      if ( noReserveMem && canSpill ) { throw new RetryAfterSpillException();} // proactive spill, skip put()
    +
           putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
    +
    +    } catch (RetryAfterSpillException re) {
    +      if ( ! canSpill ) { throw new OutOfMemoryException(getOOMErrorMsg("Can not spill")); }
    --- End diff --
    
    The method getOOMErrorMsg() does all this explanation ...


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138437442
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---
    @@ -646,6 +687,46 @@ public AggOutcome doWork() {
       }
     
       /**
    +   *   Use reserved values memory (if available) to try and preemp an OOM
    +   */
    +  private void useReservedValuesMemory() {
    +    // try to preempt an OOM by using the reserved memory
    +    long reservedMemory = reserveValueBatchMemory;
    +    if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
    +
    +    reserveValueBatchMemory = 0;
    +  }
    +  /**
    +   *   Use reserved outgoing output memory (if available) to try and preemp an OOM
    +   */
    +  private void useReservedOutgoingMemory() {
    +    // try to preempt an OOM by using the reserved memory
    +    long reservedMemory = reserveOutgoingMemory;
    +    if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
    --- End diff --
    
       Because the first uncontrolled memory allocation happens when inserting into the hash table (i.e. put()).  Given this "uncontrollability", better OOM there (which we can handle, by spilling and retrying). Now if all the memory was "given" in the limit, the put() may not OOM, but leave too little available memory to continue (i.e. to create a values batch, or an outgoing batch) -- these situations we can not handle.
       By subtracting from the limit a "reserve" for these two batches, we may force a put() OOM early (but that's OK). But we also ensure that the following two batches could be allocated. In some way this is similar to having multiple dedicated allocators, only simpler.
       Adding or subtracting is just an operation on a local field; no performance effect in any way. Also by using a single allocator we can handle cases like a "bump" in a batch size (which may exceed pre-allocation in a dedicated separate allocator).
    



---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by Ben-Zvi <gi...@git.apache.org>.
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r140098512
  
    --- Diff: common/src/main/java/org/apache/drill/common/exceptions/UserException.java ---
    @@ -536,6 +542,33 @@ public Builder pushContext(final String name, final double value) {
          * @return user exception
          */
         public UserException build(final Logger logger) {
    +
    +      // To allow for debugging:
    +      //     A spinner code to make the execution stop here while the file '/tmp/drillspin' exists
    --- End diff --
    
    Done


---

[GitHub] drill pull request #938: DRILL-5694: Handle HashAgg OOM by spill and retry, ...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939116
  
    --- Diff: common/src/main/java/org/apache/drill/common/exceptions/RetryAfterSpillException.java ---
    @@ -0,0 +1,32 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.common.exceptions;
    +
    +import org.apache.drill.exec.proto.UserBitShared;
    +
    +/**
    + *  A special exception to be caught by caller, who is supposed to free memory by spilling and try again
    + *
    + */
    +public class RetryAfterSpillException extends UserException {
    --- End diff --
    
    If this exception is thrown and caught internally, it should not extend `UserException`. Instead it should extend the Java `RuntimeException`.
    
    Better, since you know you must catch this, this should be a checked exception, extended from `Exception` and declared by the method that throws it.
    
    `UserException` is purely for exceptions reported to the user.


---