You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/12/03 04:23:31 UTC

[GitHub] [druid] kfaraz opened a new issue #12022: [Proposal] Improve memory estimations in `OnHeapIncrementalIndex`

kfaraz opened a new issue #12022:
URL: https://github.com/apache/druid/issues/12022


   ### Motivation
   
   The existing implementation in `OnHeapIncrementalIndex` tends to over-estimate memory usage
   thus leading to more persistence cycles than necessary during ingestion. A more accurate estimation
   of mem usage would also free it up for other purposes.
   
   The recent changes in #11950 have made improvements by adding a `guessAggregatorHeapFootprint()` method to the `Aggregator` interface. This proposal also addresses the same problem but advocates replacing the guess mechanism with getting the actual incremental memory used by an aggregator.
   
   ### Proposed changes
   
   - Update the method `Aggregator.aggregate()` to return a `long` instead of `void`. The returned long would represent the incremental memory in bytes used by the aggregator in that particular invocation of the method.
   - Remove the method `DimensionIndexer.estimateEncodedKeyComponentSize()`
   - Update the method `DimensionIndexer.getUnsortedEncodedValueFromSorted()` to return objects of a new generic class `EncodedDimensionValue<EncodedType>` which contains:
      - `EncodedType value`: e.g. `int[]` for `StringDimIndexer`, `Long` for `LongDimIndexer`
      - `long incrementalSize`: The delta in size required for the `value`. For numerical values, e.g. in `LongDimensionIndexer`, it would just be the size of the datatype. But for `StringDimensionIndexer`, which returns an encoded `int []`, it would represent the size of the array and also any _new_ dimension value that has been encoded into the dictionary in this invocation. Simply put, the `getUnsortedEncodedValueFromSorted()` now returns a payload and also the memory required for that payload.
   
   ### Rationale
   
   Estimation of a row size
   `row size = aggregator size + dims key size + overhead`
   
   __Aggregator Size__
   Currently, we compute the _max_ size for the aggregator and use it for every row, which overestimates the actual memory usage. With the proposed change, we would be using the actual footprint for each row and not the max thus getting more accurate estimates.
   
   __Dims Key Size__
   The estimation of the dimension key size already takes into account the current row and not the max size. But here, the overestimates are caused by repeatedly adding the footprint of the same String values (especially in the case of multi-valued dimensions) which are in fact stored _only once_ in the dictionary and only the integers codes are present in every row. With the proposed change, we add the footprint of a String value only once when it is being newly added to the dictionary.
           
   ### Backwards Compatibility
   
   Neither of the changes mentioned above would be backwards compatible.
   
   __Aggregator Size__
   Any class implementing the `Aggregator` interface would need to be fixed to return a `long` instead of `void`.
   
   Workaround (Rejected):
   To retain compatibilty, we could add a new `default long aggregateAndEstimateMemory()` method and leave the existing `aggregate()` method as is. The default implementation would return a negative value (say -1) in which case the `OnHeapIncrementalIndex` or any other estimator would use the max estimated size for that invocation. 
   
   But this approach would be pretty hacky and problematic in the long run as callers of the `Aggregator` would be free to call either of the two aggregate methods thus producing widely different and erroneous memory estimations.
   
   __Dims Key Size__
   Any class implementing the `DimensionIndexer` would have to be fixed. (This change is less of a compatibility concern as compared to the `Aggregator` change as the `DimensionIndexer` has fewer implementations)
   
   Workaround (Rejected):
   To retain compatibility, we could retain the `DimensionIndexer.estimateEncodedKeyComponentSize()` which would account for a String value only when it is newly encountered. But this would require having two dictionaries, the first used by the method `getUnsortedEncodedValueFromSorted()` to track encoded String values encoding and the second used by `estimateEncodedKeyComponentSize()` to track estimated String values.
   
   This workaround would introduce unnecessary complexity and overhead of maintaining two dictionaries.
   
   ### Operational impact
   
   - All aggregator implementations in extensions would have to be updated.
   - Rolling Upgrade: Not affected
   
   ### Future Work (optional)
   
   The memory estimate values returned by the updated `aggregate()` method could be used by other callers (such as `TimeseriesQueryEngine`) to estimate memory if required.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz edited a comment on issue #12022: [Proposal] Improve memory estimations in `OnHeapIncrementalIndex`

Posted by GitBox <gi...@apache.org>.
kfaraz edited a comment on issue #12022:
URL: https://github.com/apache/druid/issues/12022#issuecomment-985976878


   Thanks a lot for the feedback, @gianm !
   
   > One example is the building of bitmaps during persist, which IIRC happens on heap. I've noticed in the past that this can have substantial footprint too.
   
   Thanks for calling this out, I will try to find other such sources of overhead that are not being estimated right now.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on issue #12022: [Proposal] Improve memory estimations in `OnHeapIncrementalIndex`

Posted by GitBox <gi...@apache.org>.
kfaraz commented on issue #12022:
URL: https://github.com/apache/druid/issues/12022#issuecomment-985976878


   Thanks a lot for the feedback, @gianm !
   
   > One example is the building of bitmaps during persist, which IIRC happens on heap. I've noticed in the past that this can have substantial footprint too.
   
   Thanks for calling this out, I will try to find other such sources of overhead that are not being estimated right now and might cause an issue.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on issue #12022: [Proposal] Improve memory estimations in `OnHeapIncrementalIndex`

Posted by GitBox <gi...@apache.org>.
gianm commented on issue #12022:
URL: https://github.com/apache/druid/issues/12022#issuecomment-986076579


   To me this feels like a change that likely doesn't require incompatibility in order to perform well, for a couple specific reasons (below). So I do think we should do benchmarking before deciding to make things incompatible.
   
   > doing this in the proposed backwards compatible way adds a branch after every single call to the aggregator which, given that this is a hot method, could have significant inadvertent impacts to query performance
   
   I was thinking that query-time callers would stick with `void aggregate`, so no branching would be needed on those paths. Currently the users of Aggregator are timeseries and topN, which have a natural limitation on the number of Aggregators they create anyway: timeseries only makes one set per time bucket and topN has its threshold. In practice I haven't noticed any issues with these query types running out of memory, so I don't think there is a reason to add the overhead of size estimation.
   
   > My only worry with that is that the most common implementation of Aggregator.aggregate would likely be via that default implementation and I'm not sure if this would be adding yet another virtual function call to the hot path.
   
   This one seems like the kind of thing that the JVM should be able to optimize. Also, for the reason above, it's a code path that I'd only expect to get called during ingestion anyway, where aggregation time is not as big a driver of overall performance as it is during queries.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on issue #12022: [Proposal] Improve memory estimations in `OnHeapIncrementalIndex`

Posted by GitBox <gi...@apache.org>.
kfaraz commented on issue #12022:
URL: https://github.com/apache/druid/issues/12022#issuecomment-986788236


   Based on the discussion above, we will modify the aggregator interfaces as follows:
   
   Interface `AggregatorFactory` will get a new `factorizeWithSize()` method which returns a structure containing both the `Aggregator` instance as well as the initial memory size.
   
   Interface `Aggregator` will get a new `aggregateWithSize()` method which returns a long representing the incremental memory used in that invocation of `aggregateWithSize()`. The default impl of this method would call `aggregate()` and return 0. Aggregators such as `sum` can rely on the default impl itself, thus always returning 0 from `aggregateWithSize()` effectively making the aggregator size same as the initial size returned from `AggregatorFactory.factorizeWithSize()`
   
   In the first iteration, we would put the new estimation logic behind a feature flag. After some more testing, the flag can be removed altogether.
   
   The following metrics will also be added (if they don't already exist):
   - number of incremental persists
   - num rows/num bytes in each persist
   - time taken to fill up buffer
   - time taken to persist buffer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cheddar commented on issue #12022: [Proposal] Improve memory estimations in `OnHeapIncrementalIndex`

Posted by GitBox <gi...@apache.org>.
cheddar commented on issue #12022:
URL: https://github.com/apache/druid/issues/12022#issuecomment-985985486


   > > Workaround (Rejected):
   > > To retain compatibilty, we could add a new default long aggregateAndEstimateMemory() method and leave the existing aggregate() method as is. The default implementation would return a negative value (say -1) in which case the OnHeapIncrementalIndex or any other estimator would use the max estimated size for that invocation.
   > 
   > This approach actually seems like a good idea to me for a few reasons:
   > 
   >     * Aggregation is hot code, and there will be some overhead to size estimation, and query-time callers don't necessarily need it. So having two methods allows the caller to decide if it actually needs size estimates or not.
   > 
   >     * We want to avoid incompatible changes to aggregation interfaces whenever possible.
   
   While this proposal does introduce a pretty big backwards incompatibility and I hate introducing such compatibilities, doing this in the proposed backwards compatible way adds a branch after every single call to the aggregator which, given that this is a hot method, could have significant inadvertent impacts to query performance.  Having the method be guaranteed to return a value that is always added minimizes that impact.
   
   That said, with just the proposed interface, the conditional still moves into the implementation of the Aggregator itself for simple aggregators like a sum aggregator where it's a fixed memory size from initialization as the first call to `aggregate()` doubles as the "initialization" call.
   
   Thinking about this gave me another idea for how to minimize the incompatibility:
   
   We could add a method `getInitializedMemorySize()` which returns how much memory is consumed by just being initialized.  For static-sized aggregators, this would essentially equate to returning the current size estimation from this one method and then always returning 0 from the `aggregate()` call.  
   
   Alternatively, instead of adding a method to the Aggregator interface, we could give `AggregatorFactory` an `AggregatorFactory.factorizeWithSize()` which returns a "holder" object that has both a reference to the `Aggregator` and its size.  Given that `AggregatorFactory` is the thing that knows the intermediate size right now, this approach would actually allow for a default implementation that pushes the current size into the return object.  If we then create a ``Aggregator.aggregateWithSize` method, it could default to an implementation that calls `aggregate` and returns 0.  Which now actually does make this a completely backwards compatible change.  My only worry with that is that the most common implementation of `Aggregator.aggregate` would likely be via that default implementation and I'm not sure if this would be adding yet another virtual function call to the hot path.  
   
   The least-risk of impact approach is probably to add the `factorizeWithSize()` and still give `aggregate` a return type.  This is backwards incompatible, but the vast majority of implementations just need to add a `return 0;` so, at least the change required is minimized.
   
   Perhaps we try the fully backwards compatible approach, benchmark it.  Adjust to the incompatible, non-default method, benchmark that and see if there's a marked difference between them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on issue #12022: [Proposal] Improve memory estimations in `OnHeapIncrementalIndex`

Posted by GitBox <gi...@apache.org>.
gianm commented on issue #12022:
URL: https://github.com/apache/druid/issues/12022#issuecomment-985596977


   > Workaround (Rejected):
   > To retain compatibilty, we could add a new default long aggregateAndEstimateMemory() method and leave the existing aggregate() method as is. The default implementation would return a negative value (say -1) in which case the OnHeapIncrementalIndex or any other estimator would use the max estimated size for that invocation.
   
   This approach actually seems like a good idea to me for a few reasons:
   
   - Aggregation is hot code, and there will be some overhead to size estimation, and query-time callers don't necessarily need it. So having two methods allows the caller to decide if it actually needs size estimates or not.
   - We want to avoid incompatible changes to aggregation interfaces whenever possible. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz closed issue #12022: [Proposal] Improve memory estimations in `OnHeapIncrementalIndex`

Posted by GitBox <gi...@apache.org>.
kfaraz closed issue #12022:
URL: https://github.com/apache/druid/issues/12022


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on issue #12022: [Proposal] Improve memory estimations in `OnHeapIncrementalIndex`

Posted by GitBox <gi...@apache.org>.
gianm commented on issue #12022:
URL: https://github.com/apache/druid/issues/12022#issuecomment-985604924


   Other than the point in the comment above about the aggregator interface, this design looks good to me.
   
   There's one potential issue I wanted to bring up: right now, there's _other_ sources of overhead that the estimation doesn't capture. It's mostly fine because the current estimation is an overestimate, so it "covers" in a sense for other things that are not explicitly estimated. So when we make these estimates more accurate we may find we need to add estimation for more kinds of things.
   
   One example is the building of bitmaps during persist, which IIRC happens on heap. I've noticed in the past that this can have substantial footprint too. You should be able to repro a case where the footprint is big by having a multi value column that has 100+ values per row. I was working with a dataset like that when I first noticed this bitmap thing. For this particular one, maybe we can set a max dictionary size?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cheddar edited a comment on issue #12022: [Proposal] Improve memory estimations in `OnHeapIncrementalIndex`

Posted by GitBox <gi...@apache.org>.
cheddar edited a comment on issue #12022:
URL: https://github.com/apache/druid/issues/12022#issuecomment-985985486


   > > Workaround (Rejected):
   > > To retain compatibilty, we could add a new default long aggregateAndEstimateMemory() method and leave the existing aggregate() method as is. The default implementation would return a negative value (say -1) in which case the OnHeapIncrementalIndex or any other estimator would use the max estimated size for that invocation.
   > 
   > This approach actually seems like a good idea to me for a few reasons:
   > 
   >     * Aggregation is hot code, and there will be some overhead to size estimation, and query-time callers don't necessarily need it. So having two methods allows the caller to decide if it actually needs size estimates or not.
   > 
   >     * We want to avoid incompatible changes to aggregation interfaces whenever possible.
   
   While this proposal does introduce a pretty big backwards incompatibility and I hate introducing such compatibilities, doing this in the proposed backwards compatible way adds a branch after every single call to the aggregator which, given that this is a hot method, could have significant inadvertent impacts to query performance.  Having the method be guaranteed to return a value that is always added minimizes that impact.
   
   That said, with just the proposed interface, the conditional still moves into the implementation of the Aggregator itself for simple aggregators like a sum aggregator where it's a fixed memory size from initialization as the first call to `aggregate()` doubles as the "initialization" call.
   
   Thinking about this gave me another idea for how to minimize the incompatibility:
   
   We could add a method `getInitializedMemorySize()` which returns how much memory is consumed by just being initialized.  For static-sized aggregators, this would essentially equate to returning the current size estimation from this one method and then always returning 0 from the `aggregate()` call.  
   
   Alternatively, instead of adding a method to the Aggregator interface, we could give `AggregatorFactory` an `AggregatorFactory.factorizeWithSize()` which returns a "holder" object that has both a reference to the `Aggregator` and its size.  Given that `AggregatorFactory` is the thing that knows the intermediate size right now, this approach would actually allow for a default implementation that pushes the current size into the return object.  If we then create a `Aggregator.aggregateWithSize` method, it could default to an implementation that calls `aggregate` and returns 0.  Which now actually does make this a completely backwards compatible change.  My only worry with that is that the most common implementation of `Aggregator.aggregate` would likely be via that default implementation and I'm not sure if this would be adding yet another virtual function call to the hot path.  
   
   The least-risk of impact approach is probably to add the `factorizeWithSize()` and still give `aggregate` a return type.  This is backwards incompatible, but the vast majority of implementations just need to add a `return 0;` so, at least the change required is minimized.
   
   Perhaps we try the fully backwards compatible approach, benchmark it.  Adjust to the incompatible, non-default method, benchmark that and see if there's a marked difference between them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org