You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "LakshSingla (via GitHub)" <gi...@apache.org> on 2023/11/23 06:22:56 UTC

[PR] Fix deadlock that can occur while merging group by results (druid)

LakshSingla opened a new pull request, #15420:
URL: https://github.com/apache/druid/pull/15420

   ### Description
   
   When trying to merge the higher order group by results on broker (primarily), and perhaps on historicals (under special conditions when we push the subquery down to the historical), a deadlock can occur depending on the structure of the query.
   The primary cause of this deadlock is that currently, we are trying to acquire the merge buffer resources at two places - `GroupByQueryQueryToolchest` (which is the place where we merge the higher-level results from the runners), and at `GroupByMergingQueryRunnerV2` (to merge the results from the runners of the multiple segments). These two were incorrectly considered to be mutually exclusive till now:
   1. `GroupByQueryQueryToolchest` - Acquires the merge buffers on the brokers only, as the historicals get the query wit the subqueries and the grouping sets to null, therefore this doesn't acquire any merge buffers. However, this might not be true, because when we push down the subquery itself, the historicals will also acquire the merge buffers at this place.
   2. `GroupByMergingQueryRunnerV2` - Acquires merge buffers on the historicals only, as it primarily merges the results from the query runners that run on the segments. This is also incorrect because the broker can act as a data server itself when the data source is an inline data source, and we attempt to run nested group bys on that. 
   
   Therefore, we have conditions when the `GroupByQueryQueryToolchest` is holding some merge buffers to merge the results of the returned runners, however, the runner itself is `GroupByMergingQueryRunnerV2` (or a decorated runner on top of it), and it also requires merge buffers to merge the "segment-level" runners to provide to the mergeResults of the toolchest. As we donot acquire the resources in a single go, following situations happen:
   
   Total merge buffers on the broker: 2
   QueryA = QueryB = A query that needs 1 merge buffer to `merge results` (in toolchest), and 1 mergeBuffer for `GroupByMergingQueryRunnerV2` on the broker
   QueryA & QueryB are running simultaneously on the cluster
   
   | Time | QueryA actions | QueryB actions | Buffers in system
   |--------|--------|--------|--------|
   | 0 |  | | 2 |
   | 1 | toolchest acquires 1 buffer | | 1 |
   | 2 | | toolchest acquires 1 buffer | 0 |
   | 3 | mergingRunner acquires 1 buffer (blocked) | | 0 |
   | 4 | | mergingRunner acquires 1 buffer (blocked) | 0 |
   
   The queries could have passed in isolation, however now they are waiting on the other to release the single merge buffer they hold to proceed. 
   
   This PR prevents such a deadlock from happening by acquiring the merge buffers in a single place and passing it down to the runner that might need it. 
   
   #### Release note
   <!-- Give your best effort to summarize your changes in a couple of sentences aimed toward Druid users. 
   
   If your change doesn't have end user impact, you can skip this section.
   
   For tips about how to write a good release note, see [Release notes](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#release-notes).
   
   -->
   
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `MyFoo`
    * `OurBar`
    * `TheirBaz`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1970631403

   @gianm With the recent changes made to the PR, I wanted to revisit the questions posed [here](https://github.com/apache/druid/pull/15420#issuecomment-1878164205), and more, that will help in reviewing.
   
   **Assumptions**
   There's an attempt to link various places where the merge buffers are acquired (`mergeResults`) and merge buffers are utilized (`mergeResults` and `mergeRunners`). However, Druid's code doesn't provide any explicit contract between the arguments of these methods, and input to `mergeResults` can be any runner, and it should function the same. While this provides flexibility and reusability to the methods, this also necessitates that there are some assumptions that the code makes implicitly, to know what type of runner is passed to `mergeResults`:
   1. For a given query, and a given server, only a single top-level `mergeResults` call will be made, that will collect the results from the various runners. The code will break down if there are multiple, nested `mergeResults` calls made (unnested calls are fine, though they don't happen)
   2. There can be multiple `mergeRunners`, because `GroupByMergingQueryRunner` only needs the merge buffers for the top-level query runner, nested ones execute via an unoptimized way.
   3. There's some knowledge to the `mergeResults` that the query runner passed to it is the one created by the corresponding toolchest's `mergeRunners` (which is the typical use case). This is encoded in the argument `willMergeRunner`, and is to be set by the callers. The only production use case where this isn't true is when the broker is merging the results gathered from the historical)
   
   These are true to my knowledge at the time of the PR (and they should remain true unless there's some whacky change in the query stack). Also, these assumptions need to be more valid for group-by queries, because only they require shared resources. Rest all queries don't, and they don't rely on any of these assumptions being correct.
   
   **Resource ID**
   Each query has a unique resource id, that is assigned to it when it enters the queryable server:
   * For brokers: It's the `ClientQuerySegmentWalker`
   * For historical: It's the `ServerManager`
   * For peons: It's the `SinkQuerySegmentWalker`
   These three classes are one of the first places the query reaches when it begins processing, therefore it is guaranteed that if the resource id is allotted at only these places, no one will overwrite the resource id during the execution. 
   Note: Historicals and Peons could have used the same query id allotted by the brokers, however they assign their own because:
   a) The user can directly choose to query the data server (while debugging etc)
   b) UNIONs are treated as multiple separate queries when the broker sends them to the historicals. Therefore we require a unique id for each part of the union, and hence we need to reassign the resource id to the query's part, or else they'll end up sharing the same resource ID. 
   
   **Tests modifications**
   This section lays out the modifications made to the test cases 
   With the assumptions laid out, we need to modify the tests, because they try to mimic the broker-historical interaction in many places, which can lead to the code not working as intended because the assumptions don't hold. For example, in many test cases, there are two nested `mergeResults` calls, the outer call mimics what the broker does, while the inner one mimics what the historical does.
   Therefore, we need to assign a unique resource id b/w each `mergeResults` call, and also make sure that that the top level mergeResults would have `willMergeRunner = false`, since it's being called on top of a mergeResults's runner, while the inner one would have `willMergeRunner = true`  because its being called on actual runners.
   
   **Merge buffer allocation**
   The merge buffers are allocated and associated with a given resource id in the global pool `GroupByResourcesReservationPool`. Multiple attempts to insert the same resource id will fail, therefore we know that there will only be resources allocated only once, as long as the query id doesn't change during the execution of the query. The pool is cleaned once `close()` is called on the reserved resources, and the mapping is removed, thus ensuring that the mapping doesn't keep growing during the execution of the queries.
   
   The call to allocate the merge buffers in the pool is done by `mergeResults`, and it allocates the resources required for it's execution as well as the execution of the `GroupByMergingQueryRunner` if `willMergeRunners=true`. The `GroupByMergingQueryRunner` doesn't allocate any resources, it assumes that the resources have been preallocated, and just takes them from the pool.
   
   Once the required merge buffers are allocated from the pool, they cannot be used by the other queries till the `close()` method is called on the `GroupByQueryResource`. This is usually done with a call to the `GroupByResourcesReservationPool#clean()` which does this and also cleans up the mapping.
   While the `GroupByQueryResource` is unclosed, the merge buffers can be taken and given back to it as needed during the execution of the query. As such, the resources are not released back to the global pool, and only given back to signify that the work of that execution unit is complete and it can be reused (or closed safely). Closing the `GroupByQueryResources` when all the merge buffers are not acquired back from the individual execution units log a warning, but doesn't throw. The resources get freed up, and if the execution unit was actually using the resources for something, it can error out. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-2068824109

   The coverage checks are failing, but due to the refactoring done on unrelated classes - DumpSegment, MaterializedVeiwQueryToolchest. There's also one [Jacoco failure](https://stackoverflow.com/questions/69906908/jacoco-java-lang-instrument-illegalclassformatexception-error-while-instrumenti), which I think is due to the code coverage check itself.
   Merging the PR, and ensuring that nothing breaks on the master. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1898301692

   @gianm 
   > The new context keys add to complexity because there's no mechanism that ensures they are set at the appropriate places, so it's tough to understand how and when they're supposed to be set. They're kind of "magical".
   
   I think my use of keys has been exaggerated, and can probably be minimized if we have a unique key set on the context per query, which can be used to reference the already reserved resources from a globally available pool (with some reservation built-in. 
   
   With regards to cloning, I don't think there's a real need for it, apart from ensuring that the callers up the stack of query runners don't get a contaminated pool, which contains keys set by the children. I was envisioning this would be useful in case when there are multiple to the mergeResults->mergeRunners called by a parent runner, being called in parallel. The child runner won't have a chance to clean up the response context object, while it gets passed to the other runner.
   
   Regarding the above comments, I have come up with the following alternative:
   
   1. The `mergeResults` will have `willMergeRunners` method, which will get populated by the callers, and in the production code will only be set to false by the `CachingClusteredClient`. Note: Can this be done by the caching clustered client as well, in case the segment caching is enabled? (Probably so, but GroupByQuery ignores segment caching so, the semantics of the flag will be a bit different than actually demonstrating - "will it merge runners")
   2. Having a reservation pool like
   ```java
   GroupByResourcesReservationPool(){
       public void reserve(String uniqueId, int numMergeBuffers)
       public void reserve(String uniqueId, int numMergeBuffers, long timeoutMs)
       public GroupByQueryResources take(String uniqueId)
       public GroupByQueryResources cleanup(String uniqueId)
   }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-2068825265

   Thanks for the reviews @gianm @kgyrtkirk @clintropolis 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15420:
URL: https://github.com/apache/druid/pull/15420#discussion_r1409028426


##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java:
##########
@@ -469,19 +496,37 @@
     // one segment's results use limit push down, the other doesn't because of insufficient buffer capacity
 
     QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
-    QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1())
-        ),
+    QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<ResultRow>(
+        (queryPlus, responseContext) -> {
+          return toolChest.mergeResults(
+              groupByFactory.mergeRunners(executorService, getRunner1())
+          ).run(
+              queryPlus.withQuery(
+                  queryPlus.getQuery().withOverriddenContext(
+                      ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true)
+                  )
+              ),
+              responseContext
+          );
+        },
         (QueryToolChest) toolChest
     );
 
-
-    QueryRunner<ResultRow> theRunner2 = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            tooSmallGroupByFactory.mergeRunners(executorService, getRunner2())
-        ),
-        (QueryToolChest) toolChest
+    QueryToolChest<ResultRow, GroupByQuery> tooSmalltoolChest = tooSmallGroupByFactory.getToolchest();
+    QueryRunner<ResultRow> theRunner2 = new FinalizeResultsQueryRunner<ResultRow>(
+        (queryPlus, responseContext) -> {
+          return tooSmalltoolChest.mergeResults(
+              tooSmallGroupByFactory.mergeRunners(executorService, getRunner2())

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6016)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java:
##########
@@ -469,19 +496,37 @@
     // one segment's results use limit push down, the other doesn't because of insufficient buffer capacity
 
     QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
-    QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1())
-        ),
+    QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<ResultRow>(
+        (queryPlus, responseContext) -> {
+          return toolChest.mergeResults(
+              groupByFactory.mergeRunners(executorService, getRunner1())

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6015)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1962747819

   @LakshSingla is this ready for another review or are some other changes pending?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1878164205

   @LakshSingla TY for the patch! Would you mind leaving an explanation in a comment here about how the fix works? Like:
   
   - what code is responsible for figuring out how many merge buffers to allocate
   - what code is responsible for actually allocating them
   - where they get stored once allocated from the server-wide pool
   - how they get released back to the server-wide pool
   
   It'll aid in reviewing, and aid anyone that comes back later to try to read about the logic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on code in PR #15420:
URL: https://github.com/apache/druid/pull/15420#discussion_r1547144457


##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java:
##########
@@ -102,35 +104,53 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<ResultRow, GroupB
   private final GroupingEngine groupingEngine;
   private final GroupByQueryConfig queryConfig;
   private final GroupByQueryMetricsFactory queryMetricsFactory;
+  private final GroupByResourcesReservationPool groupByResourcesReservationPool;
 
   @VisibleForTesting
-  public GroupByQueryQueryToolChest(GroupingEngine groupingEngine)
+  public GroupByQueryQueryToolChest(
+      GroupingEngine groupingEngine,
+      GroupByResourcesReservationPool groupByResourcesReservationPool
+  )
   {
-    this(groupingEngine, GroupByQueryConfig::new, DefaultGroupByQueryMetricsFactory.instance());
+    this(
+        groupingEngine,
+        GroupByQueryConfig::new,
+        DefaultGroupByQueryMetricsFactory.instance(),
+        groupByResourcesReservationPool
+    );
   }
 
   @Inject
   public GroupByQueryQueryToolChest(
       GroupingEngine groupingEngine,
       Supplier<GroupByQueryConfig> queryConfigSupplier,
-      GroupByQueryMetricsFactory queryMetricsFactory
+      GroupByQueryMetricsFactory queryMetricsFactory,
+      @Merging GroupByResourcesReservationPool groupByResourcesReservationPool

Review Comment:
   True, I reasoned that `@Merging` was used to annotate the global resources associated with the group by merging therefore I annotated the resource pool with that, and it doesn't have any functional relevance. I am fine with removing it as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1886424908

   I'm hoping we can fix the bug without making the code too much more complex. There's some things that contribute to the complexity and I'm hoping we can do some simpler alternative:
   
   1. The new context keys add to complexity because there's no mechanism that ensures they are set at the appropriate places, so it's tough to understand how and when they're supposed to be set. They're kind of "magical".
   2. The fact that we are now cloning the response context, and are using these cloned response contexts to store lifecycled objects. It makes it harder to follow the lifecycle of the objects and ensure they are properly passed around and closed.
   
   As an alternative to the context keys, maybe we can either add a parameter to `mergeResults` like `willMergeRunners`, or maybe we can add a new method on the toolchest entirely like `acquireResources(boolean willMergeRunners)`. These are less "magical" than the context keys since Java itself will ensure that the parameters are passed in at the appropriate times. Makes it a lot easier to follow what's going on.
   
   As to the response context, I don't really understand why we're cloning it. What's the reason for that? In any case, a possible alternative could be to put a unique key in the query context (just a string) and then have the various parts of the query use that key to get their resources from something that's injected (like the merge buffer pool is currently injected). Or, an alternative could be to eliminate the cloning and have a requirement that only one `mergeRunners` stack be running at once. I think that's in fact true…
   
   Lastly, for testing, maybe I missed it but I didn't see a test case for the Broker-side scenario. The following query is the simplest one I could come up with that exhibits the problem. Perhaps `ClientQuerySegmentWalkerTest` would be a good place to put a test using a query like this.
   
   ```json
   {
     "queryType": "groupBy",
     "dataSource": {
       "type": "query",
       "query": {
         "queryType": "groupBy",
         "dataSource": {
           "type": "inline",
           "columnNames": []
           "rows": []
         },
         "intervals": "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z",
         "granularity": "all"
       }
     },
     "intervals": "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z",
     "granularity": "all"
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1878165342

   One thing I'm wondering is what we need `GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2` for. It shows up in places that should not need to be aware of the `groupBy` query, which isn't desirable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "abhishekagarwal87 (via GitHub)" <gi...@apache.org>.
abhishekagarwal87 commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1878359196

   Curious when do we push down a subquery to historical? Where does that logic sit? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1966918130

   @gianm I am working on fixing up the ITs and the UTs due to the changed semantics of resource passing. 
   A few issues I ran into were sharing of the same resource ID on the historicals for union queries, query runners not taking lightly to modified queries and UTs not populating the query ID due to extensive mocking of the mergeResults + mergeRunenrs stack. I should be able to correct the ITs shortly. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15420:
URL: https://github.com/apache/druid/pull/15420#discussion_r1406769405


##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java:
##########
@@ -627,23 +626,26 @@
   @Test
   public void testDescendingNumerics()
   {
-    QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical = groupByFactoryHistorical.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1(2))
+        toolChestHistorical.mergeResults(
+            groupByFactoryHistorical.mergeRunners(executorService, getRunner1(2))

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6006)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java:
##########
@@ -905,23 +948,25 @@
 
   private List<ResultRow> testForcePushLimitDownAccuracyWhenSortHasNonGroupingFieldsHelper(Map<String, Object> context)
   {
-    QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical = groupByFactoryHistorical.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1(4))
+        toolChestHistorical.mergeResults(
+            groupByFactoryHistorical.mergeRunners(executorService, getRunner1(4))

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6010)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java:
##########
@@ -905,23 +948,25 @@
 
   private List<ResultRow> testForcePushLimitDownAccuracyWhenSortHasNonGroupingFieldsHelper(Map<String, Object> context)
   {
-    QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical = groupByFactoryHistorical.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1(4))
+        toolChestHistorical.mergeResults(
+            groupByFactoryHistorical.mergeRunners(executorService, getRunner1(4))
         ),
-        (QueryToolChest) toolChest
+        (QueryToolChest) toolChestHistorical
     );
 
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical2 = groupByFactoryHistorical2.getToolchest();
     QueryRunner<ResultRow> theRunner2 = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory2.mergeRunners(executorService, getRunner2(5))
+        toolChestHistorical2.mergeResults(
+            groupByFactoryHistorical2.mergeRunners(executorService, getRunner2(5))

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6011)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java:
##########
@@ -764,23 +785,26 @@
   {
     // one segment's results use limit push down, the other doesn't because of insufficient buffer capacity
 
-    QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical = groupByFactoryHistorical.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1(0))
+        toolChestHistorical.mergeResults(
+            groupByFactoryHistorical.mergeRunners(executorService, getRunner1(0))

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6008)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java:
##########
@@ -764,23 +785,26 @@
   {
     // one segment's results use limit push down, the other doesn't because of insufficient buffer capacity
 
-    QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical = groupByFactoryHistorical.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1(0))
+        toolChestHistorical.mergeResults(
+            groupByFactoryHistorical.mergeRunners(executorService, getRunner1(0))
         ),
-        (QueryToolChest) toolChest
+        (QueryToolChest) toolChestHistorical
     );
 
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical2 = groupByFactoryHistorical2.getToolchest();
     QueryRunner<ResultRow> theRunner2 = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory2.mergeRunners(executorService, getRunner2(1))
+        toolChestHistorical2.mergeResults(
+            groupByFactoryHistorical2.mergeRunners(executorService, getRunner2(1))

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6009)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java:
##########
@@ -627,23 +626,26 @@
   @Test
   public void testDescendingNumerics()
   {
-    QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical = groupByFactoryHistorical.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1(2))
+        toolChestHistorical.mergeResults(
+            groupByFactoryHistorical.mergeRunners(executorService, getRunner1(2))
         ),
-        (QueryToolChest) toolChest
+        (QueryToolChest) toolChestHistorical
     );
 
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical2 = groupByFactoryHistorical2.getToolchest();
     QueryRunner<ResultRow> theRunner2 = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory2.mergeRunners(executorService, getRunner2(3))
+        toolChestHistorical2.mergeResults(
+            groupByFactoryHistorical2.mergeRunners(executorService, getRunner2(3))

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6007)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "kgyrtkirk (via GitHub)" <gi...@apache.org>.
kgyrtkirk commented on code in PR #15420:
URL: https://github.com/apache/druid/pull/15420#discussion_r1407604341


##########
processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java:
##########
@@ -127,17 +123,32 @@ public GroupingEngine(
    *
    * @return broker resource
    */
-  public GroupByQueryResources prepareResource(GroupByQuery query)
+  public static GroupByQueryResources prepareResource(
+      GroupByQuery query,
+      BlockingPool<ByteBuffer> mergeBufferPool,
+      boolean usesGroupByMergingQueryRunner,
+      GroupByQueryConfig groupByQueryConfig
+  )
   {
-    final int requiredMergeBufferNum = GroupByQueryResources.countRequiredMergeBufferNum(query);
+
+    final int requiredMergeBufferNumForToolchestMerge =
+        GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(query);
+
+    final int requiredMergeBufferNumForMergingQueryRunner =
+        usesGroupByMergingQueryRunner
+        ? GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(groupByQueryConfig, query)
+        : 0;

Review Comment:
   I find it a pretty odd contract that it gets counted here and used somewhere
   
   I think decoupling things like this could increase complexity significantly



##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java:
##########
@@ -311,39 +310,26 @@ public void cleanup(CloseableGrouperIterator<RowBasedKey, ResultRow> iterFromMak
 
   private List<ReferenceCountingResourceHolder<ByteBuffer>> getMergeBuffersHolder(
       int numBuffers,
-      boolean hasTimeout,
-      long timeoutAt
+      ResponseContext responseContext
   )
   {
-    try {
-      if (numBuffers > mergeBufferPool.maxSize()) {
-        throw new ResourceLimitExceededException(
-            "Query needs " + numBuffers + " merge buffers, but only "
-            + mergeBufferPool.maxSize() + " merge buffers were configured. "
-            + "Try raising druid.processing.numMergeBuffers."
-        );
-      }
-      final List<ReferenceCountingResourceHolder<ByteBuffer>> mergeBufferHolder;
-      // This will potentially block if there are no merge buffers left in the pool.
-      if (hasTimeout) {
-        final long timeout = timeoutAt - System.currentTimeMillis();
-        if (timeout <= 0) {
-          throw new QueryTimeoutException();
-        }
-        if ((mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers, timeout)).isEmpty()) {
-          throw new QueryTimeoutException("Cannot acquire enough merge buffers");
-        }
-      } else {
-        mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers);
-      }
-      return mergeBufferHolder;
+    GroupByQueryResources resource = (GroupByQueryResources) responseContext.get(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS);

Review Comment:
   seems like `responseContext` is used as a backplane here...
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla merged PR #15420:
URL: https://github.com/apache/druid/pull/15420


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on code in PR #15420:
URL: https://github.com/apache/druid/pull/15420#discussion_r1408780032


##########
processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java:
##########
@@ -127,17 +123,32 @@ public GroupingEngine(
    *
    * @return broker resource
    */
-  public GroupByQueryResources prepareResource(GroupByQuery query)
+  public static GroupByQueryResources prepareResource(
+      GroupByQuery query,
+      BlockingPool<ByteBuffer> mergeBufferPool,
+      boolean usesGroupByMergingQueryRunner,
+      GroupByQueryConfig groupByQueryConfig
+  )
   {
-    final int requiredMergeBufferNum = GroupByQueryResources.countRequiredMergeBufferNum(query);
+
+    final int requiredMergeBufferNumForToolchestMerge =
+        GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(query);
+
+    final int requiredMergeBufferNumForMergingQueryRunner =
+        usesGroupByMergingQueryRunner
+        ? GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(groupByQueryConfig, query)
+        : 0;

Review Comment:
   Yes it is pretty bad. What's also unsettling is that we now wanna make sure that GroupByQueryMergingRunnerV2 expects the merge buffer from someone above in the chain, which enforces a contract b/w two different query runners, with only a generic responseContext that holds them. 
   
   Merge buffers do need a redesign, and I hope the changes introduced in the patch become obsolete. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1880425801

   To disambiguate between these cases, there's a flag called `CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2` which determines if the query will use the mergeRunner inside the mergeResult callstack, which defaults to true, and is only false when the broker is merging the results from the data servers.
   
   In the new code, the `mergeResults` will acquire all the merge buffers based on the following properties:
   1. Depending on the query structure, it will acquire the merge buffers (as was done in the original code)
   2. If the mergeRunners call is nested inside, it will acquire the merge buffers required by the GroupByQueryMergingRunnerV2 (1 or 2 depending on config value).
   
   @gianm I can probably change the context parameter to be more generic and true for all the queries (CTX_KEY_USES_MERGE_RUNNERS) , however actionable only in the group by toolchest. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1878919490

   > Curious when do we push down a subquery to historical? Where does that logic sit?
   
   I think @LakshSingla is talking about `forcePushDownNestedQuery`, added in #5471, an undocumented feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on code in PR #15420:
URL: https://github.com/apache/druid/pull/15420#discussion_r1546905734


##########
processing/src/main/java/org/apache/druid/query/FluentQueryRunner.java:
##########
@@ -90,9 +90,9 @@ public FluentQueryRunner<T> postProcess(PostProcessingOperator<T> postProcessing
     return from(postProcessing != null ? postProcessing.postProcess(baseRunner) : baseRunner);
   }
 
-  public FluentQueryRunner<T> mergeResults()
+  public FluentQueryRunner<T> mergeResults(boolean willMergeRunner)

Review Comment:
   Please add javadoc for what `willMergeRunner` means. I recognize most other stuff in here doesn't have javadocs, but, still.



##########
server/src/main/java/org/apache/druid/server/ResourceIdPopulatingQueryRunner.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+
+/**
+ * Populates {@link org.apache.druid.query.QueryContexts#QUERY_RESOURCE_ID} in the query context
+ */
+public class ResourceIdPopulatingQueryRunner<T> implements QueryRunner<T>
+{
+  private final QueryRunner<T> baseRunner;
+
+  public ResourceIdPopulatingQueryRunner(QueryRunner<T> baseRunner)
+  {
+    this.baseRunner = baseRunner;
+  }
+
+  @Override
+  public Sequence<T> run(
+      final QueryPlus<T> queryPlus,
+      final ResponseContext responseContext
+  )
+  {
+    return baseRunner.run(
+        queryPlus.withQuery(
+            ClientQuerySegmentWalker.populateResourceId(queryPlus.getQuery())

Review Comment:
   Would make more sense to me if `populateResourceId` was here, rather than in `ClientQuerySegmentWalker`.



##########
processing/src/main/java/org/apache/druid/query/QueryToolChest.java:
##########
@@ -118,6 +118,22 @@ public QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner)
     return new ResultMergeQueryRunner<>(runner, this::createResultComparator, this::createMergeFn);
   }
 
+  /**
+   * Like {@link #mergeResults(QueryRunner)}, but additional context parameter to determine whether the input runner
+   * to the method would be the result from the corresponding {@link QueryRunnerFactory#mergeRunners}. Merging can
+   * require additional resources, like merge buffers for group-by queries, therefore the flag, can help
+   * determine if the mergeResults should acquire those resources for the merging runners, before beginning execution.
+   * If not overridden, this method will ignore the {@code willMergeRunner} parameter.
+   *
+   * Ideally {@link #mergeResults(QueryRunner)} should have delegated to this method after setting the default value of

Review Comment:
   Couple questions about this:
   
   - what should the "default value of `willMergeRunner`" be? It seems like a bunch of places use `true`, but why is that?
   - what should _new_ toolchests do? Is it ok to override just the new 2-arg `mergeResults` call, or do both need to be overridden?
   
   I'm hoping we can make this `willMergeRunner` piece more clear, since IMO it's the main unclear thing left in the patch after the last round of changes.



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java:
##########
@@ -102,35 +104,53 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<ResultRow, GroupB
   private final GroupingEngine groupingEngine;
   private final GroupByQueryConfig queryConfig;
   private final GroupByQueryMetricsFactory queryMetricsFactory;
+  private final GroupByResourcesReservationPool groupByResourcesReservationPool;
 
   @VisibleForTesting
-  public GroupByQueryQueryToolChest(GroupingEngine groupingEngine)
+  public GroupByQueryQueryToolChest(
+      GroupingEngine groupingEngine,
+      GroupByResourcesReservationPool groupByResourcesReservationPool
+  )
   {
-    this(groupingEngine, GroupByQueryConfig::new, DefaultGroupByQueryMetricsFactory.instance());
+    this(
+        groupingEngine,
+        GroupByQueryConfig::new,
+        DefaultGroupByQueryMetricsFactory.instance(),
+        groupByResourcesReservationPool
+    );
   }
 
   @Inject
   public GroupByQueryQueryToolChest(
       GroupingEngine groupingEngine,
       Supplier<GroupByQueryConfig> queryConfigSupplier,
-      GroupByQueryMetricsFactory queryMetricsFactory
+      GroupByQueryMetricsFactory queryMetricsFactory,
+      @Merging GroupByResourcesReservationPool groupByResourcesReservationPool

Review Comment:
   As I understand it— there is no reason to use `@Merging` here, since there's only one kind of `GroupByResourcesReservationPool`. (The annotations are used to disambiguate when there's multiple kinds of some injectable key.)



##########
processing/src/main/java/org/apache/druid/query/QueryContext.java:
##########
@@ -591,6 +591,10 @@ public boolean isWindowingStrictValidation()
     );
   }
 
+  public String getQueryResourceId()

Review Comment:
   IMO it'd be good to make this a wrapper around String like `ResourceId`. That provides us a central place to put some javadocs that explain how resources work, and link with `@link` and `@see` to other relevant files. It also makes it easier to find usages in an IDE.



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.annotations.Merging;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Reserves the {@link GroupByQueryResources} for a given group by query and maps them to the query's resource ID.
+ */
+public class GroupByResourcesReservationPool
+{
+  /**
+   * Map of query's resource id -> group by resources reserved for the query to execute
+   */
+  final ConcurrentHashMap<String, GroupByQueryResources> pool = new ConcurrentHashMap<>();

Review Comment:
   Use `ResourceId` as key once it exists



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.annotations.Merging;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Reserves the {@link GroupByQueryResources} for a given group by query and maps them to the query's resource ID.
+ */
+public class GroupByResourcesReservationPool
+{
+  /**
+   * Map of query's resource id -> group by resources reserved for the query to execute
+   */
+  final ConcurrentHashMap<String, GroupByQueryResources> pool = new ConcurrentHashMap<>();
+
+  /**
+   * Buffer pool from where the merge buffers are picked and reserved
+   */
+  final BlockingPool<ByteBuffer> mergeBufferPool;
+
+  /**
+   * Group by query config of the server
+   */
+  final GroupByQueryConfig groupByQueryConfig;
+
+  @Inject
+  public GroupByResourcesReservationPool(
+      @Merging BlockingPool<ByteBuffer> mergeBufferPool,
+      GroupByQueryConfig groupByQueryConfig
+  )
+  {
+    this.mergeBufferPool = mergeBufferPool;
+    this.groupByQueryConfig = groupByQueryConfig;
+  }
+
+  /**
+   * Reserves appropariate resources, and maps it to the queryResourceId (usually the query's resource id) in the internal map

Review Comment:
   appropriate (spelling)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1878983991

   Yes @abhishekagarwal87, if the flag is set, then the outer query along with the subquery is sent to the historicals. Else, only the subquery is sent to the historical and the broker does the processing on top of the subquery's results. 
   
   @gianm I'll try to add the comments in the code itself, making it more self-explanatory. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1881038647

   Regarding acquisition and freeing up the merge buffers, the `GroupByResources` instance will acquire the merge buffers when created with the appropriate number. It internally acquires all the merge buffers in a single take and separates them into two pools. The individual call sites can acquire and release the merge buffers from their respective pools when they require them. The GroupByResources instance when closed will release all the merge buffers to the Druid byte buffer pool.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "kgyrtkirk (via GitHub)" <gi...@apache.org>.
kgyrtkirk commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1830030636

   I've experimented a bit with the above stuff - and although it could work - its not that much different; it needed a different refactor to be done...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15420:
URL: https://github.com/apache/druid/pull/15420#discussion_r1500021853


##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java:
##########
@@ -380,16 +387,19 @@
     // one segment's results use limit push down, the other doesn't because of insufficient buffer capacity
 
     QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> tooSmallToolChest = tooSmallGroupByFactory.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
         toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1())
+            groupByFactory.mergeRunners(executorService, getRunner1()),
+            true
         ),
         (QueryToolChest) toolChest
     );
 
     QueryRunner<ResultRow> theRunner2 = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            tooSmallGroupByFactory.mergeRunners(executorService, getRunner2())
+        tooSmallToolChest.mergeResults(
+            tooSmallGroupByFactory.mergeRunners(executorService, getRunner2()),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6740)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java:
##########
@@ -763,23 +795,28 @@
   {
     // one segment's results use limit push down, the other doesn't because of insufficient buffer capacity
 
-    QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical = groupByFactoryHistorical.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1(0))
+        toolChestHistorical.mergeResults(
+            groupByFactoryHistorical.mergeRunners(executorService, getRunner1(0)),
+            true
         ),
-        (QueryToolChest) toolChest
+        (QueryToolChest) toolChestHistorical
     );
 
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical2 = groupByFactoryHistorical2.getToolchest();
     QueryRunner<ResultRow> theRunner2 = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory2.mergeRunners(executorService, getRunner2(1))
+        toolChestHistorical2.mergeResults(
+            groupByFactoryHistorical2.mergeRunners(executorService, getRunner2(1)),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6747)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java:
##########
@@ -904,23 +945,27 @@
 
   private List<ResultRow> testForcePushLimitDownAccuracyWhenSortHasNonGroupingFieldsHelper(Map<String, Object> context)
   {
-    QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical = groupByFactoryHistorical.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1(4))
+        toolChestHistorical.mergeResults(
+            groupByFactoryHistorical.mergeRunners(executorService, getRunner1(4)),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6748)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java:
##########
@@ -904,23 +945,27 @@
 
   private List<ResultRow> testForcePushLimitDownAccuracyWhenSortHasNonGroupingFieldsHelper(Map<String, Object> context)
   {
-    QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical = groupByFactoryHistorical.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1(4))
+        toolChestHistorical.mergeResults(
+            groupByFactoryHistorical.mergeRunners(executorService, getRunner1(4)),
+            true
         ),
-        (QueryToolChest) toolChest
+        (QueryToolChest) toolChestHistorical
     );
 
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical2 = groupByFactoryHistorical2.getToolchest();
     QueryRunner<ResultRow> theRunner2 = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory2.mergeRunners(executorService, getRunner2(5))
+        toolChestHistorical2.mergeResults(
+            groupByFactoryHistorical2.mergeRunners(executorService, getRunner2(5)),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6749)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java:
##########
@@ -626,23 +649,28 @@
   @Test
   public void testDescendingNumerics()
   {
-    QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical = groupByFactoryHistorical.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1(2))
+        toolChestHistorical.mergeResults(
+            groupByFactoryHistorical.mergeRunners(executorService, getRunner1(2)),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6744)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java:
##########
@@ -380,16 +387,19 @@
     // one segment's results use limit push down, the other doesn't because of insufficient buffer capacity
 
     QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> tooSmallToolChest = tooSmallGroupByFactory.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
         toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1())
+            groupByFactory.mergeRunners(executorService, getRunner1()),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6739)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java:
##########
@@ -279,7 +281,8 @@
     QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
         toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, makeGroupByMultiRunners())
+            groupByFactory.mergeRunners(executorService, makeGroupByMultiRunners()),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6741)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java:
##########
@@ -626,23 +649,28 @@
   @Test
   public void testDescendingNumerics()
   {
-    QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical = groupByFactoryHistorical.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1(2))
+        toolChestHistorical.mergeResults(
+            groupByFactoryHistorical.mergeRunners(executorService, getRunner1(2)),
+            true
         ),
-        (QueryToolChest) toolChest
+        (QueryToolChest) toolChestHistorical
     );
 
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical2 = groupByFactoryHistorical2.getToolchest();
     QueryRunner<ResultRow> theRunner2 = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory2.mergeRunners(executorService, getRunner2(3))
+        toolChestHistorical2.mergeResults(
+            groupByFactoryHistorical2.mergeRunners(executorService, getRunner2(3)),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6745)



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java:
##########
@@ -763,23 +795,28 @@
   {
     // one segment's results use limit push down, the other doesn't because of insufficient buffer capacity
 
-    QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChestHistorical = groupByFactoryHistorical.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
-        toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getRunner1(0))
+        toolChestHistorical.mergeResults(
+            groupByFactoryHistorical.mergeRunners(executorService, getRunner1(0)),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6746)



##########
processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java:
##########
@@ -700,17 +702,20 @@
   {
     ResponseContext context = ResponseContext.createEmpty();
     QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChest2 = groupByFactory2.getToolchest();
     GroupByQuery pushDownQuery = nestedQuery;
     QueryRunner<ResultRow> segment1Runner = new FinalizeResultsQueryRunner<ResultRow>(
         toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getQueryRunnerForSegment1())
+            groupByFactory.mergeRunners(executorService, getQueryRunnerForSegment1()),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6750)



##########
processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java:
##########
@@ -700,17 +702,20 @@
   {
     ResponseContext context = ResponseContext.createEmpty();
     QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
+    QueryToolChest<ResultRow, GroupByQuery> toolChest2 = groupByFactory2.getToolchest();
     GroupByQuery pushDownQuery = nestedQuery;
     QueryRunner<ResultRow> segment1Runner = new FinalizeResultsQueryRunner<ResultRow>(
         toolChest.mergeResults(
-            groupByFactory.mergeRunners(executorService, getQueryRunnerForSegment1())
+            groupByFactory.mergeRunners(executorService, getQueryRunnerForSegment1()),
+            true
         ),
         (QueryToolChest) toolChest
     );
 
     QueryRunner<ResultRow> segment2Runner = new FinalizeResultsQueryRunner<ResultRow>(
-        toolChest.mergeResults(
-            groupByFactory2.mergeRunners(executorService, getQueryRunnerForSegment2())
+        toolChest2.mergeResults(
+            groupByFactory2.mergeRunners(executorService, getQueryRunnerForSegment2()),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6751)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "kgyrtkirk (via GitHub)" <gi...@apache.org>.
kgyrtkirk commented on code in PR #15420:
URL: https://github.com/apache/druid/pull/15420#discussion_r1407916915


##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java:
##########
@@ -311,39 +310,26 @@ public void cleanup(CloseableGrouperIterator<RowBasedKey, ResultRow> iterFromMak
 
   private List<ReferenceCountingResourceHolder<ByteBuffer>> getMergeBuffersHolder(
       int numBuffers,
-      boolean hasTimeout,
-      long timeoutAt
+      ResponseContext responseContext
   )
   {
-    try {
-      if (numBuffers > mergeBufferPool.maxSize()) {
-        throw new ResourceLimitExceededException(
-            "Query needs " + numBuffers + " merge buffers, but only "
-            + mergeBufferPool.maxSize() + " merge buffers were configured. "
-            + "Try raising druid.processing.numMergeBuffers."
-        );
-      }
-      final List<ReferenceCountingResourceHolder<ByteBuffer>> mergeBufferHolder;
-      // This will potentially block if there are no merge buffers left in the pool.
-      if (hasTimeout) {
-        final long timeout = timeoutAt - System.currentTimeMillis();
-        if (timeout <= 0) {
-          throw new QueryTimeoutException();
-        }
-        if ((mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers, timeout)).isEmpty()) {
-          throw new QueryTimeoutException("Cannot acquire enough merge buffers");
-        }
-      } else {
-        mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers);
-      }
-      return mergeBufferHolder;
+    GroupByQueryResources resource = (GroupByQueryResources) responseContext.get(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS);

Review Comment:
   @LakshSingla explained that this is a role this class could fill in the future (thx :) ) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1880421795

   Adding a comment here as well to aid in the code walkthrough, and for anyone revisiting the PR:
   
   The number of merge buffers required to execute a group by query is calculated based on the structure of the query. There are many levels of merging that can happen. Per my understanding, a raw segment gets aggregated after passing through the following structure:
   
   1. Each individual segment gets aggregated using the `GroupByQueryEngineV2#process`. This doesn't use the shared merge buffers. By definition, it is not nested, as it operates on the individual segments(In data server)
   2. The aggregated results from the multiple segments get partially merged, and combined into a single runner using the `GroupByMergingQueryRunnerV2`, which is sent to the broker. _This utilises the shared merge buffers, and can be one or two, depending on the value of the config `numParallelCombineThreads`._ Also, nested calls to this code doesn't use additional merge buffers, it goes through a much more expensive `ChainedExecutionQueryRunner`.
   3. The server calls additional QueryToolchest#mergeResults on the resulting runner to further aggregate the data. This doesn't use the merge buffers because the historical doesn't receive subqueries (see caveat below). 
   4. The broker fetches the results from the multiple data servers, and "merges" them using the CachingClusteredClient.SpecificQueryRunnable#run. This "merge" doesn't aggregate the result objects, it orders the different sequence objects sequentially. 
   5. The broker then calls the final QueryToolChest#mergeResults, which is then decorated upon.
   
   Steps 1-3 happen on the data servers, while 4-5 happen on the brokers. 
   It is worth noting that GroupByQueryRunnerFactory#mergeRunners can take up 1-2 merging buffers depending on the value of the config `numParallelCombineThreads` and the GroupByQueryQueryRunnerToolchest can take up 0-3 merge buffers depending on the query structure (subqueries and the subtotals).
   
   The above was an idealistic world, where there was no nested call between the mergeResults and the mergeRunners, therefore there was a single place where the merge buffer can be acquired. However, there are two esoteric cases when this would not be true:
   
   1. The historicals get subqueries - Historicals only get the innermost query, and the broker processes further results on the returned results of the innermost query. However, if the flag `forcePushDownNestedQuery` is set to true, then the historical can have nested query. The steps 2 & 3 in the flow chart above would both acquire merge buffers. 
   2. The broker operates on inlined data source - The broker would then emulate a part of the historical's stack, and the broker would have a callstack like mergeResults(mergeRunners..) (See `LocalQuerySegmentWalker). 
   
   Therefore, in places where there's a nested call stack like mergeResults(.....mergeRunners(....)), the code acquires merge buffers in two places. This is true in:
   1. Data servers in all the cases (mergeResults can acquire 0 buffers, if the subquery & subtotals is null as is in most cases, however it can be non-null if the subquery was pushed down with the other query. Subtotals is always null)
   2. Broker, when the query is to be run on the historicals (again mergeResults can acquire exactly 0 buffers, depending on the structure, but the nested callstack is still there)
   
   The only place where we don't have a nested call stack is when the Broker merges the results from the historicals, wherein the mergeRunners i called by the historicals, and the mergeResults is called on the "combined" version of the runner returned by the historicals.
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "clintropolis (via GitHub)" <gi...@apache.org>.
clintropolis commented on code in PR #15420:
URL: https://github.com/apache/druid/pull/15420#discussion_r1448191941


##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByUtils.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.query.context.ResponseContext;
+
+import java.nio.ByteBuffer;
+
+public class GroupByUtils
+{
+  /**
+   *
+   */
+  public static final String CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION = "mergeRunnersUsingChainedExecution";
+
+  /**
+   *
+   */
+  public static final String CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2 = "runnerMergesUsingGroupByMergingQueryRunnerV2";

Review Comment:
   there are a bunch of query context group by keys in `GroupByQuery`, these should probably live there too instead of adding a new class to hold them. Alternatively, I guess they could have just stayed in GroupByMergingQueryRunnerV2 since they seem kind of specific to its function.
   
   Also please drop the "V2" from this flag name since there is only v2 now



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByUtils.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.query.context.ResponseContext;
+
+import java.nio.ByteBuffer;
+
+public class GroupByUtils
+{
+  /**
+   *
+   */
+  public static final String CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION = "mergeRunnersUsingChainedExecution";
+
+  /**
+   *
+   */
+  public static final String CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2 = "runnerMergesUsingGroupByMergingQueryRunnerV2";
+
+  /**
+   * Reason for using this is to ensure that we donot set the merge buffers multiple times on the same response context
+   */
+  public static final ResponseContext.Key RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS =

Review Comment:
   this could just live in `GroupingEngine` or somewhere else if we move the query context parameters to `GroupByQuery` (or I suppose could also live with the query context parameters in `GroupByQuery`)
   



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -237,13 +239,16 @@ public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<In
           newQuery
       );
     } else if (canRunQueryUsingClusterWalker(newQuery)) {
+      Query<T> queryToRun = newQuery.withOverriddenContext(
+          ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false)
+      );

Review Comment:
   why is this set to false here, and then inside of the cluster walker (which is CachingClusteredClient) it is immediately set to true?
   
   Also, shouldn't there be something for the local walker? The local walker is the only thing on the broker that calls 'mergeRunners' to use the group by merging query runner... otherwise the broker does not use it for any other query shape afaict.



##########
server/src/main/java/org/apache/druid/client/CachingClusteredClient.java:
##########
@@ -201,9 +202,14 @@ private <T> Sequence<T> run(
       final boolean specificSegments
   )
   {
-    final ClusterQueryResult<T> result = new SpecificQueryRunnable<>(queryPlus, responseContext)
+    QueryPlus<T> queryPlus1 = queryPlus.withQuery(
+        queryPlus.getQuery().withOverriddenContext(
+            ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true)
+        )
+    );
+    final ClusterQueryResult<T> result = new SpecificQueryRunnable<>(queryPlus1, responseContext)

Review Comment:
   this seems off to apply it to all query types, when it only affects group by queries. Also, it doesn't really seem quite correct either, since if `CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION` is set, or if 'by segment' is set, mergeRunners does not take a merge buffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-2047121929

   @gianm 
   Thanks for taking another look. I have addressed the review comments. Regarding the relevance and the use case of the `willMergeRunner`, I have reworded the docs. LMK in case they are still cryptic. The idea is to signify the `mergeResults` that the runner being called with it will be the `GroupByMergingQueryRunner` so that it can allocate the resources for that (hence the name "willMergeRunners", since it is not limited to the groupBy tool chest). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on PR #15420:
URL: https://github.com/apache/druid/pull/15420#issuecomment-1970680139

   @gianm The PR is ready for review now. There are a few code coverage failures, but I think they are due to making changes in the unrelated classes using the `GroupByQueryQueryToolchest`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix deadlock that can occur while merging group by results (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on code in PR #15420:
URL: https://github.com/apache/druid/pull/15420#discussion_r1568243893


##########
processing/src/main/java/org/apache/druid/query/QueryResourceId.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * Wrapper class on the queryResourceId string. The object must be addressable on an associative map, therefore it must implement
+ * equals and hashCode.
+ * <p>
+ * Query's resource id is used to allocate the resources, and identify the resources allocated to a query in a global pool.
+ * Queries USUALLY do not share any resources - each query is assigned its own thread, and buffer pool. However, some resources
+ * are shared globally - the GroupBy query's merge buffers being a prime example of those (and the primary utiliser of the
+ * query's resource id). Such resources MUST be allocated once to prevent deadlocks, and can be used throughout the query stack, till
+ * the query holds those resources, or till its completion. A query holding a global resources must not request for more of the same
+ * resource, or else it becomes a candidate for deadlocks.
+ * <p>
+ * Each query has a unique resource id, that is assigned to it when it enters the queryable server. This is distinct from
+ * the existing queryId, subqueryId and sqlQueryId in the following ways:
+ * 1. It is not assigned by the user, it is assigned internally for usage by the Druid server
+ * 2. The query's resource id will be unique to the query in the system. The queryId can be non-unique amongst the queries
+ * that are running in the system. Druid must ensure that the queryResourceId isn't unique. If the user (somehow)
+ * assigns the queryResourceId to the query, it must be overwritten internally.
+ * 3. During the query server <-> data server communication, the queryResourceId assigned to a particular query can (and will)
+ * differ in the query servers and the data servers. This is particularly helpful in case of union queries, where a
+ * single query in the broker can be treated as two separate queries and executed simultaneously in the historicals.
+ * <p>
+ * The queryId is assigned to the query, and populated in the query context at the time it hits the queryable server. In Druid,
+ * there are three queryable servers (classes are not linkable from this method):
+ * 1. {@link org.apache.druid.server.ClientQuerySegmentWalker} - For brokers
+ * 2. {@link org.apache.druid.server.coordination.ServerManager} - For historicals
+ * 3. {@link org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker} - For peons & indexer's tasks
+ * <p>
+ * These three classes are one of the first places the query reaches when it begins processing, therefore it is
+ * guaranteed that if the resource id is allotted at only these places, no one will overwrite the resource id
+ * during the execution.
+ * <p>
+ * Note: Historicals and Peons could have used the same query id allotted by the brokers, however they assign their own because:
+ * 1. The user can directly choose to query the data server (while debugging etc.)
+ * 2. UNIONs are treated as multiple separate queries when the broker sends them to the historicals. Therefore, we
+ * require a unique id for each part of the union, and hence we need to reassign the resource id to the query's part,
+ * or else they'll end up sharing the same resource id, as mentioned before
+ * <p>
+ * Notable places where QueryResourceId is used:
+ * <p>
+ * 1. {@link org.apache.druid.query.groupby.GroupByResourcesReservationPool} Primary user of the query resource id.
+ * <p>
+ * 2. {@link org.apache.druid.server.ClientQuerySegmentWalker} Allocates the query resource id on the brokers
+ * <p>
+ * 3. {@link org.apache.druid.server.coordination.ServerManager} Allocates the query resource id on the historicals
+ * <p>
+ * 4. {@link org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker} Allocates the query resource id on the peons
+ * (MMs) and indexers
+ * <p>
+ * 5. {@link org.apache.druid.server.ResourceIdPopulatingQueryRunner} Populates the query resource id. ({@link org.apache.druid.server.ClientQuerySegmentWalker}
+ * allocates the query resource id directly, since it also does a bunch of transforms to the query)
+ * <p>
+ * 6. {@link org.apache.druid.query.groupby.GroupByQueryQueryToolChest} Allocates, and associates one of the global resources,
+ * merge buffers, with the query's resource id. It also cleans it up, once the query is completed. Apart from that,
+ * it is also a consumer of the merge buffers it allocates.
+ * <p>
+ * 7. {@link org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunner} One of the consumer of the merge buffers,
+ * allocated at the beginning of the query
+ *
+ * @see org.apache.druid.query.groupby.GroupByResourcesReservationPool
+ */
+public class QueryResourceId
+{
+  private final String queryResourceId;
+
+  public QueryResourceId(String queryResourceId)
+  {
+    this.queryResourceId = Preconditions.checkNotNull(queryResourceId, "queryResourceId must be present");
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    QueryResourceId that = (QueryResourceId) o;
+    return Objects.equals(queryResourceId, that.queryResourceId);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(queryResourceId);
+  }
+
+  @Override
+  public String toString()
+  {

Review Comment:
   Simpler, and will make interpolations nicer, to just do `return queryResourceId;` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org