You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/24 15:29:52 UTC

[GitHub] [arrow-datafusion] alamb opened a new issue, #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

alamb opened a new issue, #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941

   **Is your feature request related to a problem or challenge? Please describe what you are trying to do.**
   The basic challenge is that DataFusion can use an unbounded amount of memory for running a plan which typically results in DataFusion being killed by some system memory protection limit (e.g. the OOM Killer on Linux). See https://github.com/apache/arrow-datafusion/issues/587   for more details
   
   As a first step towards supporting larger datasets in DataFusion, if a plan will exceed the overall budget, it should generate a runtime error rather than exceeding the budget and risking
   
   If the hash aggregate exceeds is budget, it will return a runtime error (resource exhausted)
   
   **Describe the solution you'd like**
   1. The user can define a limit for memory via [`MemoryManagerConfig`](https://docs.rs/datafusion/13.0.0/datafusion/execution/memory_manager/enum.MemoryManagerConfig.html)
   2. All operators that consume significant memory (Hash, Join, Sort) will properly account for and request memory from the `MemoryManager` via methods like [`try_grow`](https://docs.rs/datafusion/13.0.0/datafusion/execution/memory_manager/trait.MemoryConsumer.html#method.try_grow)
   3. If sufficient memory can not be allocated, the plan should return [ResourcesExhausted](https://docs.rs/datafusion/13.0.0/datafusion/error/enum.DataFusionError.html#variant.ResourcesExhausted)
   
   Needed:
   - [ ] Use `MemoryManager` in `SortExec`, and return errors if the memory budget is exceeded
   - [ ] Use `MemoryManager` in Aggregate operators, and return errors if the memory budget is exceeded
   - [ ] Use `MemoryManager` in Join operators, and return errors if the memory budget is exceeded
   
   **Describe alternatives you've considered**
   We can always increase  the accuracy of the memory allocation accounting  (e.g. `RecordBatch`es internal to  operators, etc). However, for this initial epic I would like to get the major consumers of memory instrumented and using the `MemoryManager` interface. Hopefully this will also allow 
   
   **Additional context**
   cc @yjshen  @crepererum 
   related to issues like https://github.com/influxdata/influxdb_iox/issues/5776 (and some internal issues of our own)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1289308478

   > When the memory budget is exceeded if we should try sort-merge join first?
   
   Yes, that would be amazing @xudong963  -- I think it is covered by https://github.com/apache/arrow-datafusion/issues/1599
   
   I think that getting the MemoryManager hooked into the join (so it knows when it would run out of memory) is probably a prerequisite to automatically switching to sort-merge join during execution. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] milenkovicm commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
milenkovicm commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1289320935

   Should memory limit be optimistic? What I mean in case of aggregation we could first process record batch, compare memory before and after batch is process and request delta value from memory manager. Otherwise we'd need to do two passes over records batch, or request memory for every record, which may lead to contention on memory manager and trigger spill in middle of batch processing. End of batch processing would be a "safe point" which should have correct memory usage, or trigger spill. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1290852886

   > I think it's OK to have some slack room on top of the limit, which is somewhat controlled by the batch size.
   
   There is also `memory_fraction` on `MemoryManagerConfig` https://docs.rs/datafusion/13.0.0/datafusion/execution/memory_manager/enum.MemoryManagerConfig.html to account for some slack in estimates


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1297032108

   Also, as one might imagine given our renewed interest in this ticket, someone from the IOx team may start working on generating runtime errors in the next few days as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1290676317

   > Should memory limit be optimistic? What I mean is that in case of aggregation we could first process record batch, compare memory before and after batch is process and request delta value from memory manager. 
   
   Yes, I agree this would be best strategy: as memory is needed, we request more memory from  the memory manager incrementally. 
   
   > End of batch processing would be a "safe point" which should have correct memory usage, or trigger spill.
   wdyt?
   
   Yes
   
   In general I think upgrading / improving the memory manager would likely be fine. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
yjshen commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1291985225

   Hi, sorry to join this party late.
   
   I don't quite get why the async is a problem for the memory manager while implementing aggregation.
   
   We could do memory-limited aggregation as follows:
   
   1. try to allocate a fixed-sized buffer (a record batch or a row batch (if row aggregate is possible) with 8096 rows for example.) for incoming groups.
   2. aggregate incoming records, updating existing buffer slots or adding new buffer rows if there's space left for the last pre-allocated buffer.
   3. if the last allocated aggregate buffer is full, we try_grow aggregate's memory by allocating another fixed-sized buffer.
   - if more memory quota is granted, go to 2 and continue aggregating.
   - if we fail to get one more aggregation buffer from the memory manager, we spill (see spill described below).
   4. if the input stream is exhausted, we could either:
   - get a sorted iterator of the in-memory aggregation buffers, do multi-way merging with the spills, get the final results, and free all memory used at last (if the aggregate is final)
   - output the in-memory buffers, and free all the memory (in case the aggregate is partial).
   
   The spill procedure would be:
   
   - if the aggregate is partial, produce partial output based on all the aggregation buffers we have, free all the memory, and go to agg step 1. to handle the following incoming tuples.
   - if the aggregate is final, sort all the current buffers by group key and spill to disk, store it in the spilled files list, and go back to agg step 1. to handle the following incoming tuples.
   
   Also, one could refer to [Apache Spark's hash aggregate impl](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala#L32-L49) if interested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1329268391

   An update here: We have added memory limit enforcement for Sort / Grouping. I plan to write some sql-level tests for this shortly. The only remaining work item that will remain is memory limiting Joins
   
   given that joins are currently undergoing some serious rework (@liukun4515 @jackwener @mingmwang  and others, eg. https://github.com/apache/arrow-datafusion/pull/4377) we don't plan to add memory limits in until that settles down


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb closed issue #3941: [Epic] Generate runtime errors if the memory budget is exceeded

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb closed issue #3941: [Epic] Generate runtime errors if the memory budget is exceeded
URL: https://github.com/apache/arrow-datafusion/issues/3941


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
crepererum commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1290334270

   I think it's OK to have some slack room on top of the limit, which is somewhat controlled by the batch size. It's unlikely that we are going to account for every single byte anyways, since there might be some aux data structures here and there that are heap-allocated. So I would treat this is a "best effort limit w/o impacting performance (too much)".
   
   We could later (if there's demand for it) add a config option "strict memory enforcement" that impacts performance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] milenkovicm commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
milenkovicm commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1291849770

   Thanks @alamb, 
   
   As I said in previous comment, the biggest problem of integrating aggregation with memory manager is `async` at `MemoryConsumer::try_grow` function. `async` is there to facilitate `async spill(..)` additional memory can't be acquired. 
   
   easy ways to work around `async` would be either to: 
   * de-couple `try_grow` and `spill`, removing `async` from `try_grow`, do manual `spill` in case of failure.
   * component to use memory manager directly `memory_manager().can_grow_directly` (we need to make this function public), do manual spill in case of failure
   
   wdyt?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
crepererum commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1290545014

   <https://docs.rs/futures/latest/futures/stream/fn.unfold.html> maybe?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1293604612

   I think we should consider reducing our hash implementation repetition prior to implementing spilling #2723 
   
   In general I think @yjshen 's algorithm for grouping in limited memory is 💯 
   
   I think we can implement it in stages, however, the first stage being tracking the current memory used and erroring when that is exceeded.
   
   Then in the second stage, rather than erroring we can implement the externalized / spilling strategy


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] milenkovicm commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
milenkovicm commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1290489703

   One thing I cant really wrap my head around is use of `async` with `try_grow`. 
   
   Issue I have is with use of `try_grow` within struct `impl Stream` e.g `GroupedHashAggregateStreamV2` which actually handles `RecordBatch`
   
   ```rust
   impl Stream for GroupedHashAggregateStreamV2 {
       type Item = ArrowResult<RecordBatch>;
   
       fn poll_next(
           mut self: std::pin::Pin<&mut Self>,
           cx: &mut Context<'_>,
       ) -> Poll<Option<Self::Item>> {
           let this = &mut *self;
          // await` is only allowed inside `async` functions and blocks only allowed inside `async` functions and blocks
           this.mem_manager.try_grow(200).await;
     }
   }
   ```
   
   anybody has any better idea how to integrate it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1291869970

   > de-couple try_grow and spill, removing async from try_grow, do manual spill in case of failure.
   
   I think decoupling try_grow and spill would be my preference. In general, if memory is exceeded we may not actually want to spill for all operators (e.g. an initial hash table might simply flush its output if it exceeded memory rather than trying to spill)
   
   cc @yjshen  in case he has some additional thoughts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #3941: [Epic] Generate runtime errors if the memory budget is exceeded

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1586276737

   I believe we have completed all initial planned work for generating runtime errors if the memory budge is execeeded, so closing this issue 🎉 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] xudong963 commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
xudong963 commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1289230359

   For join operator, we have sort-merge join.  
   
   When the memory budget is exceeded if we should try sort-merge join first?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] milenkovicm commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
milenkovicm commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1292030069

   thanks for your comment @yjshen, 
   
   concern raised in previous comments is not spill algorithm for aggregation, it is about interaction between `non-async` and `async` code.
   
   As it is implemented `GroupedHashAggregateStreamV2` lives in non-async word but memory manager / memory consumer expose `async` methods which can't be called from `poll_next`: 
   
   ```rust
   impl Stream for GroupedHashAggregateStreamV2 {
       type Item = ArrowResult<RecordBatch>;
   
       fn poll_next(
           mut self: std::pin::Pin<&mut Self>,
           cx: &mut Context<'_>,
       ) -> Poll<Option<Self::Item>> {
           let this = &mut *self;
          // await` is only allowed inside `async` functions and blocks only allowed inside `async` functions and blocks
           this.mem_manager.try_grow(200).await;
         // rest of the code which does aggregation and spill
     }
   }
   ```
   so the question is how to bridge the gap
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
yjshen commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1292050231

   Thanks for the explanation! 
   
   Sort in DataFusion is currently memory limited, and I think we could apply a similar approach in aggregation like that in sort: https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/physical_plan/sorts/sort.rs#L799-L813. 
   
   We could async `do_agg` as it is for `do_sort`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on issue #3941: Generate runtime errors if the memory budget is exceeded [EPIC]

Posted by GitBox <gi...@apache.org>.
yjshen commented on issue #3941:
URL: https://github.com/apache/arrow-datafusion/issues/3941#issuecomment-1296463942

   Thanks @alamb for reminding me! I'll start working on #2723 this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org