You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by Jisoo Kim <ji...@snap.com.INVALID> on 2018/07/19 21:42:45 UTC

Question on GroupBy query results merging process

Hi all,

I am currently working on a project that uses Druid's QueryRunner and other
druid-processing classes. It uses Druid's own classes to calculate query
results. I have been testing large GroupBy queries (using v2), and it seems
like parallel combining threads for GroupBy queries are only enabled on the
historical level. I think it is only getting called by
GroupByStrategyV2.mergeRunners()
<https://github.com/apache/incubator-druid/blob/druid-0.12.1/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java#L335>
which is only called by GroupByQueryRunnerFactory.mergeRunners() on
historicals.

Are GroupByMergingQueryRunnerV2 and parallel combining threads meant for
computing and merging per-segment results only, or can they also be used on
the broker level? I changed the logic of my project from calling
queryToolChest.mergeResults() on MergeSequence (created by providing a list
of per-segment/per-server sequences) to calling
queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners() (where
each runner returns a deserialized result sequence), and that seemed to
have reduced really heavy groupby query computation time or failures by
quite a lot. Or is this just a coincidence and there shouldn't be a
performance difference in merging groupby query results, and the only
difference could've been by parallelizing the deserialization of result
sequences from sub-queries?

Thanks,
Jisoo

Re: Question on GroupBy query results merging process

Posted by Jihoon Son <gh...@gmail.com>.
Hi Jisoo,

no worries. Questions are always welcome.

> How does queryToolChest.mergeResults() assumes the input is sorted by
grouping keys? Seems like it's using ResultMergeQueryRunner with query's
row ordering within the method, and queryToolChest.mergeResults() gets
called in ServerManager as well where it uses
queryRunnerFactory.mergeRunners(). The difference between ServerManager and
ClientQuerySegmentWalker/CachingClusteredClient seems to be that
CachingClusteredClient returns MergeSequence with
query.getResultOrdering(), which I think uses rowOrdering. Is the use of
MergeSequence the one that's making the difference? In that case, is it ok
for ServerManager to call queryToolChest.mergeResults() on top of
queryRunnerFactory.mergeRunners()? I am curious about what differentiates
the Druid broker from historicals when it comes to merging the sub query
results.

The thing is, GroupByMergingQueryRunnerV2.run(), which is called in
GroupByQueryRunnerFactory.mergeRunners(), always returns the sorted result
(please check RowBasedGrouperHelper.makeGrouperIterator()). So, the result
from each historical is already sorted. If you call
queryRunnerFactory.mergeRunners() on these sorted results, it first
performs hash aggregation and then sorts them again.

queryToolChest.mergeResults(queryRunnerFactory.mergeRunners(exec,
queryRunners)) in ServerManager should be fine. It's just merging all
results from each segments returning it to the broker.

MergeSequence is just used to merge several sequences into a single one and
iterates items in those sequences in order. So, the items in each sequence
must be sorted before consuming by MergeSequence. CombiningSequence
consumes the items from MergeSequence and performs sorted-merge aggregation
(in ResultMergeQueryRunner).

> Could you please point me to the code for this?

I'm not sure what code you mean, but here are somethings you might be
interested in.

- GroupByQueryEngineV2.process() is called to process each segment. This
uses hash aggregation by default (BufferHashGrouper). It can use
array-based aggregation (BufferArrayGrouper) if the number of grouping keys
is 1.
- GroupByMergingQueryRunnerV2.run() can call GroupByQueryEngineV2.process()
concurrently per segment. ConcurrentGrouper is used to allow concurrent
aggregation. Once the aggregation completes,
GroupByMergingQueryRunnerV2.run() sorts and returns the result
(RowBasedGrouperHelper.makeGrouperIterator()). The sort can also be done in
parallel (ConcurrentGrouper.iterator()). ParallelCombiner can be optionally
used for combining sorted results from each segment.

Hope this helps.

Best,
Jihoon

On Wed, Jul 25, 2018 at 4:25 PM Jisoo Kim <ji...@snap.com.invalid>
wrote:

> Hi Jihoon,
>
> Thanks for the explanation, but I think I need some more clarification on
> the reason not to use queryRunnerFactory.mergeRunners().
>
> > In v2, the broker assumes that intermediate aggregates from historicals
> are
> always sorted by grouping keys. This enables merge-sorted aggregation on
> brokers which is much more efficient than hash aggregation in terms of
> speed as well as memory usage. However, queryRunnerFactory.mergeRunners()
> works based on hash aggregation. This would cause the same issue of groupBy
> v1 which requires the full materialization of intermediates on brokers
> (either in memory or on disk).
>
> Could you please point me to the code for this?
>
> > Also, this requires to sort the result
> of queryRunnerFactory.mergeRunners() before
> calling queryToolChest.mergeResults() since it assumes the input is sorted
> by grouping keys.
>
> How does queryToolChest.mergeResults() assumes the input is sorted by
> grouping keys? Seems like it's using ResultMergeQueryRunner with query's
> row ordering within the method, and queryToolChest.mergeResults() gets
> called in ServerManager as well where it uses
> queryRunnerFactory.mergeRunners(). The difference between ServerManager and
> ClientQuerySegmentWalker/CachingClusteredClient seems to be that
> CachingClusteredClient returns MergeSequence with
> query.getResultOrdering(), which I think uses rowOrdering. Is the use of
> MergeSequence the one that's making the difference? In that case, is it ok
> for ServerManager to call queryToolChest.mergeResults() on top of
> queryRunnerFactory.mergeRunners()? I am curious about what differentiates
> the Druid broker from historicals when it comes to merging the sub query
> results.
>
> Sorry for many questions and thanks,
> Jisoo
>
> On Sat, Jul 21, 2018 at 7:28 PM, Jihoon Son <gh...@gmail.com> wrote:
>
> > Hi Jisoo,
> >
> > I think it would work, but there is currently at least one reason to not
> > use queryRunnerFactory.mergeRunners() in groupBy v2.
> >
> > In v2, the broker assumes that intermediate aggregates from historicals
> are
> > always sorted by grouping keys. This enables merge-sorted aggregation on
> > brokers which is much more efficient than hash aggregation in terms of
> > speed as well as memory usage. However, queryRunnerFactory.mergeRunners()
> > works based on hash aggregation. This would cause the same issue of
> groupBy
> > v1 which requires the full materialization of intermediates on brokers
> > (either in memory or on disk). Also, this requires to sort the result
> > of queryRunnerFactory.mergeRunners() before
> > calling queryToolChest.mergeResults() since it assumes the input is
> sorted
> > by grouping keys.
> >
> > > I am wondering if the improvement that I gained was from changing logic
> > was mainly from deserializing the sub-query results in parallel (by
> calling
> > queryRunnerFactory.mergeRunners() which seems to enable parallelism), or
> > if
> > it was also benefitting from using GroupByMergingQueryRunnerV2 that has
> > parallel combining threads enabled.
> >
> > I assume you enabled parallel combining in GroupByMergingQueryRunnerV2
> > (it's disabled by default). Then, it's difficult to tell where you gained
> > the benefit. You might ned to run more benchmarks to figure out.
> >
> > Best,
> > Jihoon
> >
> > On Thu, Jul 19, 2018 at 4:29 PM Jisoo Kim <ji...@snap.com.invalid>
> > wrote:
> >
> > > Hi Jihoon,
> > >
> > > Thanks for the reply. So what I ended up doing for merging a list of
> > > serialized result Sequences (which is a byte array) was:
> > >
> > > 1) Create a stream of  out of the list
> > > 2) For each serialized sequence in a list, create a query runner that
> > > deserializes the byte array and returns a Sequence (along with applying
> > > PreComputeManipulatorFn). Now the stream becomes Stream<QueryRunner>
> > > 3) Call queryRunnerFactory.mergeRunners() (factory is created from the
> > > injector and given query) on the materialized list of QueryRunner
> > > 4) Create a FluentQueryRunner out of 3) and add necessary steps
> including
> > > mergeResults(), which essentially calls queryToolChest.mergeResults()
> on
> > > queryRunnerFactory.mergeRunners()
> > >
> > > Does my approach look valid or is it something that I shouldn't be
> doing
> > > for merging query results? Before I changed the merging logic to the
> > above
> > > I encountered a problem with merging sub-query results properly for
> very
> > > heavy groupBy queries.
> > >
> > > I haven't had much chance to read through all the group by query
> > processing
> > > logic, but I am wondering if the improvement that I gained was from
> > > changing logic was mainly from deserializing the sub-query results in
> > > parallel (by calling queryRunnerFactory.mergeRunners() which seems to
> > > enable parallelism), or if it was also benefitting from using
> > > GroupByMergingQueryRunnerV2 that has parallel combining threads
> enabled.
> > >
> > > Thanks,
> > > Jisoo
> > >
> > > On Thu, Jul 19, 2018 at 3:06 PM, Jihoon Son <gh...@gmail.com>
> wrote:
> > >
> > > > Hi Jisoo,
> > > >
> > > > sorry, the previous email was sent by accident.
> > > >
> > > > The initial version of groupBy v2 wasn't capable of combining
> > > intermediates
> > > > in parallel. Some of our customers met the similar issue to yours,
> and
> > > so I
> > > > was working on improving groupBy v2 performance for a while.
> > > >
> > > > Parallel combining on brokers definitely makes sense. I was thinking
> to
> > > add
> > > > a sort of ParallelMergeSequence which is a parallel version of
> > > > MergeSequence, but it can be anything if it supports parallel
> combining
> > > on
> > > > brokers.
> > > >
> > > > One thing I'm worrying about is, most query processing interfaces in
> > > > brokers are using Sequence, and thus using another stuff for a
> specific
> > > > query type might make the codes complicated. I think we need to avoid
> > it
> > > if
> > > > possible.
> > > >
> > > > Best,
> > > > Jihoon
> > > >
> > > > On Thu, Jul 19, 2018 at 2:58 PM Jihoon Son <gh...@gmail.com>
> wrote:
> > > >
> > > > > Hi Jisoo,
> > > > >
> > > > > the initial version of groupBy v2
> > > > >
> > > > > On Thu, Jul 19, 2018 at 2:42 PM Jisoo Kim
> <jisoo.kim@snap.com.invalid
> > >
> > > > > wrote:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> I am currently working on a project that uses Druid's QueryRunner
> > and
> > > > >> other
> > > > >> druid-processing classes. It uses Druid's own classes to calculate
> > > query
> > > > >> results. I have been testing large GroupBy queries (using v2), and
> > it
> > > > >> seems
> > > > >> like parallel combining threads for GroupBy queries are only
> enabled
> > > on
> > > > >> the
> > > > >> historical level. I think it is only getting called by
> > > > >> GroupByStrategyV2.mergeRunners()
> > > > >> <
> > > > >> https://github.com/apache/incubator-druid/blob/druid-0.
> > > > 12.1/processing/src/main/java/io/druid/query/groupby/
> > > > strategy/GroupByStrategyV2.java#L335
> > > > >> >
> > > > >> which is only called by GroupByQueryRunnerFactory.mergeRunners()
> on
> > > > >> historicals.
> > > > >>
> > > > >> Are GroupByMergingQueryRunnerV2 and parallel combining threads
> meant
> > > for
> > > > >> computing and merging per-segment results only, or can they also
> be
> > > used
> > > > >> on
> > > > >> the broker level? I changed the logic of my project from calling
> > > > >> queryToolChest.mergeResults() on MergeSequence (created by
> > providing a
> > > > >> list
> > > > >> of per-segment/per-server sequences) to calling
> > > > >> queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners()
> > > > (where
> > > > >> each runner returns a deserialized result sequence), and that
> seemed
> > > to
> > > > >> have reduced really heavy groupby query computation time or
> failures
> > > by
> > > > >> quite a lot. Or is this just a coincidence and there shouldn't be
> a
> > > > >> performance difference in merging groupby query results, and the
> > only
> > > > >> difference could've been by parallelizing the deserialization of
> > > result
> > > > >> sequences from sub-queries?
> > > > >>
> > > > >> Thanks,
> > > > >> Jisoo
> > > > >>
> > > > >
> > > >
> > >
> >
>

Re: Question on GroupBy query results merging process

Posted by Jisoo Kim <ji...@snap.com.INVALID>.
Hi Jihoon,

Thanks for the explanation, but I think I need some more clarification on
the reason not to use queryRunnerFactory.mergeRunners().

> In v2, the broker assumes that intermediate aggregates from historicals
are
always sorted by grouping keys. This enables merge-sorted aggregation on
brokers which is much more efficient than hash aggregation in terms of
speed as well as memory usage. However, queryRunnerFactory.mergeRunners()
works based on hash aggregation. This would cause the same issue of groupBy
v1 which requires the full materialization of intermediates on brokers
(either in memory or on disk).

Could you please point me to the code for this?

> Also, this requires to sort the result
of queryRunnerFactory.mergeRunners() before
calling queryToolChest.mergeResults() since it assumes the input is sorted
by grouping keys.

How does queryToolChest.mergeResults() assumes the input is sorted by
grouping keys? Seems like it's using ResultMergeQueryRunner with query's
row ordering within the method, and queryToolChest.mergeResults() gets
called in ServerManager as well where it uses
queryRunnerFactory.mergeRunners(). The difference between ServerManager and
ClientQuerySegmentWalker/CachingClusteredClient seems to be that
CachingClusteredClient returns MergeSequence with
query.getResultOrdering(), which I think uses rowOrdering. Is the use of
MergeSequence the one that's making the difference? In that case, is it ok
for ServerManager to call queryToolChest.mergeResults() on top of
queryRunnerFactory.mergeRunners()? I am curious about what differentiates
the Druid broker from historicals when it comes to merging the sub query
results.

Sorry for many questions and thanks,
Jisoo

On Sat, Jul 21, 2018 at 7:28 PM, Jihoon Son <gh...@gmail.com> wrote:

> Hi Jisoo,
>
> I think it would work, but there is currently at least one reason to not
> use queryRunnerFactory.mergeRunners() in groupBy v2.
>
> In v2, the broker assumes that intermediate aggregates from historicals are
> always sorted by grouping keys. This enables merge-sorted aggregation on
> brokers which is much more efficient than hash aggregation in terms of
> speed as well as memory usage. However, queryRunnerFactory.mergeRunners()
> works based on hash aggregation. This would cause the same issue of groupBy
> v1 which requires the full materialization of intermediates on brokers
> (either in memory or on disk). Also, this requires to sort the result
> of queryRunnerFactory.mergeRunners() before
> calling queryToolChest.mergeResults() since it assumes the input is sorted
> by grouping keys.
>
> > I am wondering if the improvement that I gained was from changing logic
> was mainly from deserializing the sub-query results in parallel (by calling
> queryRunnerFactory.mergeRunners() which seems to enable parallelism), or
> if
> it was also benefitting from using GroupByMergingQueryRunnerV2 that has
> parallel combining threads enabled.
>
> I assume you enabled parallel combining in GroupByMergingQueryRunnerV2
> (it's disabled by default). Then, it's difficult to tell where you gained
> the benefit. You might ned to run more benchmarks to figure out.
>
> Best,
> Jihoon
>
> On Thu, Jul 19, 2018 at 4:29 PM Jisoo Kim <ji...@snap.com.invalid>
> wrote:
>
> > Hi Jihoon,
> >
> > Thanks for the reply. So what I ended up doing for merging a list of
> > serialized result Sequences (which is a byte array) was:
> >
> > 1) Create a stream of  out of the list
> > 2) For each serialized sequence in a list, create a query runner that
> > deserializes the byte array and returns a Sequence (along with applying
> > PreComputeManipulatorFn). Now the stream becomes Stream<QueryRunner>
> > 3) Call queryRunnerFactory.mergeRunners() (factory is created from the
> > injector and given query) on the materialized list of QueryRunner
> > 4) Create a FluentQueryRunner out of 3) and add necessary steps including
> > mergeResults(), which essentially calls queryToolChest.mergeResults() on
> > queryRunnerFactory.mergeRunners()
> >
> > Does my approach look valid or is it something that I shouldn't be doing
> > for merging query results? Before I changed the merging logic to the
> above
> > I encountered a problem with merging sub-query results properly for very
> > heavy groupBy queries.
> >
> > I haven't had much chance to read through all the group by query
> processing
> > logic, but I am wondering if the improvement that I gained was from
> > changing logic was mainly from deserializing the sub-query results in
> > parallel (by calling queryRunnerFactory.mergeRunners() which seems to
> > enable parallelism), or if it was also benefitting from using
> > GroupByMergingQueryRunnerV2 that has parallel combining threads enabled.
> >
> > Thanks,
> > Jisoo
> >
> > On Thu, Jul 19, 2018 at 3:06 PM, Jihoon Son <gh...@gmail.com> wrote:
> >
> > > Hi Jisoo,
> > >
> > > sorry, the previous email was sent by accident.
> > >
> > > The initial version of groupBy v2 wasn't capable of combining
> > intermediates
> > > in parallel. Some of our customers met the similar issue to yours, and
> > so I
> > > was working on improving groupBy v2 performance for a while.
> > >
> > > Parallel combining on brokers definitely makes sense. I was thinking to
> > add
> > > a sort of ParallelMergeSequence which is a parallel version of
> > > MergeSequence, but it can be anything if it supports parallel combining
> > on
> > > brokers.
> > >
> > > One thing I'm worrying about is, most query processing interfaces in
> > > brokers are using Sequence, and thus using another stuff for a specific
> > > query type might make the codes complicated. I think we need to avoid
> it
> > if
> > > possible.
> > >
> > > Best,
> > > Jihoon
> > >
> > > On Thu, Jul 19, 2018 at 2:58 PM Jihoon Son <gh...@gmail.com> wrote:
> > >
> > > > Hi Jisoo,
> > > >
> > > > the initial version of groupBy v2
> > > >
> > > > On Thu, Jul 19, 2018 at 2:42 PM Jisoo Kim <jisoo.kim@snap.com.invalid
> >
> > > > wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> I am currently working on a project that uses Druid's QueryRunner
> and
> > > >> other
> > > >> druid-processing classes. It uses Druid's own classes to calculate
> > query
> > > >> results. I have been testing large GroupBy queries (using v2), and
> it
> > > >> seems
> > > >> like parallel combining threads for GroupBy queries are only enabled
> > on
> > > >> the
> > > >> historical level. I think it is only getting called by
> > > >> GroupByStrategyV2.mergeRunners()
> > > >> <
> > > >> https://github.com/apache/incubator-druid/blob/druid-0.
> > > 12.1/processing/src/main/java/io/druid/query/groupby/
> > > strategy/GroupByStrategyV2.java#L335
> > > >> >
> > > >> which is only called by GroupByQueryRunnerFactory.mergeRunners() on
> > > >> historicals.
> > > >>
> > > >> Are GroupByMergingQueryRunnerV2 and parallel combining threads meant
> > for
> > > >> computing and merging per-segment results only, or can they also be
> > used
> > > >> on
> > > >> the broker level? I changed the logic of my project from calling
> > > >> queryToolChest.mergeResults() on MergeSequence (created by
> providing a
> > > >> list
> > > >> of per-segment/per-server sequences) to calling
> > > >> queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners()
> > > (where
> > > >> each runner returns a deserialized result sequence), and that seemed
> > to
> > > >> have reduced really heavy groupby query computation time or failures
> > by
> > > >> quite a lot. Or is this just a coincidence and there shouldn't be a
> > > >> performance difference in merging groupby query results, and the
> only
> > > >> difference could've been by parallelizing the deserialization of
> > result
> > > >> sequences from sub-queries?
> > > >>
> > > >> Thanks,
> > > >> Jisoo
> > > >>
> > > >
> > >
> >
>

Re: Question on GroupBy query results merging process

Posted by Jihoon Son <gh...@gmail.com>.
Hi Jisoo,

I think it would work, but there is currently at least one reason to not
use queryRunnerFactory.mergeRunners() in groupBy v2.

In v2, the broker assumes that intermediate aggregates from historicals are
always sorted by grouping keys. This enables merge-sorted aggregation on
brokers which is much more efficient than hash aggregation in terms of
speed as well as memory usage. However, queryRunnerFactory.mergeRunners()
works based on hash aggregation. This would cause the same issue of groupBy
v1 which requires the full materialization of intermediates on brokers
(either in memory or on disk). Also, this requires to sort the result
of queryRunnerFactory.mergeRunners() before
calling queryToolChest.mergeResults() since it assumes the input is sorted
by grouping keys.

> I am wondering if the improvement that I gained was from changing logic
was mainly from deserializing the sub-query results in parallel (by calling
queryRunnerFactory.mergeRunners() which seems to enable parallelism), or if
it was also benefitting from using GroupByMergingQueryRunnerV2 that has
parallel combining threads enabled.

I assume you enabled parallel combining in GroupByMergingQueryRunnerV2
(it's disabled by default). Then, it's difficult to tell where you gained
the benefit. You might ned to run more benchmarks to figure out.

Best,
Jihoon

On Thu, Jul 19, 2018 at 4:29 PM Jisoo Kim <ji...@snap.com.invalid>
wrote:

> Hi Jihoon,
>
> Thanks for the reply. So what I ended up doing for merging a list of
> serialized result Sequences (which is a byte array) was:
>
> 1) Create a stream of  out of the list
> 2) For each serialized sequence in a list, create a query runner that
> deserializes the byte array and returns a Sequence (along with applying
> PreComputeManipulatorFn). Now the stream becomes Stream<QueryRunner>
> 3) Call queryRunnerFactory.mergeRunners() (factory is created from the
> injector and given query) on the materialized list of QueryRunner
> 4) Create a FluentQueryRunner out of 3) and add necessary steps including
> mergeResults(), which essentially calls queryToolChest.mergeResults() on
> queryRunnerFactory.mergeRunners()
>
> Does my approach look valid or is it something that I shouldn't be doing
> for merging query results? Before I changed the merging logic to the above
> I encountered a problem with merging sub-query results properly for very
> heavy groupBy queries.
>
> I haven't had much chance to read through all the group by query processing
> logic, but I am wondering if the improvement that I gained was from
> changing logic was mainly from deserializing the sub-query results in
> parallel (by calling queryRunnerFactory.mergeRunners() which seems to
> enable parallelism), or if it was also benefitting from using
> GroupByMergingQueryRunnerV2 that has parallel combining threads enabled.
>
> Thanks,
> Jisoo
>
> On Thu, Jul 19, 2018 at 3:06 PM, Jihoon Son <gh...@gmail.com> wrote:
>
> > Hi Jisoo,
> >
> > sorry, the previous email was sent by accident.
> >
> > The initial version of groupBy v2 wasn't capable of combining
> intermediates
> > in parallel. Some of our customers met the similar issue to yours, and
> so I
> > was working on improving groupBy v2 performance for a while.
> >
> > Parallel combining on brokers definitely makes sense. I was thinking to
> add
> > a sort of ParallelMergeSequence which is a parallel version of
> > MergeSequence, but it can be anything if it supports parallel combining
> on
> > brokers.
> >
> > One thing I'm worrying about is, most query processing interfaces in
> > brokers are using Sequence, and thus using another stuff for a specific
> > query type might make the codes complicated. I think we need to avoid it
> if
> > possible.
> >
> > Best,
> > Jihoon
> >
> > On Thu, Jul 19, 2018 at 2:58 PM Jihoon Son <gh...@gmail.com> wrote:
> >
> > > Hi Jisoo,
> > >
> > > the initial version of groupBy v2
> > >
> > > On Thu, Jul 19, 2018 at 2:42 PM Jisoo Kim <ji...@snap.com.invalid>
> > > wrote:
> > >
> > >> Hi all,
> > >>
> > >> I am currently working on a project that uses Druid's QueryRunner and
> > >> other
> > >> druid-processing classes. It uses Druid's own classes to calculate
> query
> > >> results. I have been testing large GroupBy queries (using v2), and it
> > >> seems
> > >> like parallel combining threads for GroupBy queries are only enabled
> on
> > >> the
> > >> historical level. I think it is only getting called by
> > >> GroupByStrategyV2.mergeRunners()
> > >> <
> > >> https://github.com/apache/incubator-druid/blob/druid-0.
> > 12.1/processing/src/main/java/io/druid/query/groupby/
> > strategy/GroupByStrategyV2.java#L335
> > >> >
> > >> which is only called by GroupByQueryRunnerFactory.mergeRunners() on
> > >> historicals.
> > >>
> > >> Are GroupByMergingQueryRunnerV2 and parallel combining threads meant
> for
> > >> computing and merging per-segment results only, or can they also be
> used
> > >> on
> > >> the broker level? I changed the logic of my project from calling
> > >> queryToolChest.mergeResults() on MergeSequence (created by providing a
> > >> list
> > >> of per-segment/per-server sequences) to calling
> > >> queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners()
> > (where
> > >> each runner returns a deserialized result sequence), and that seemed
> to
> > >> have reduced really heavy groupby query computation time or failures
> by
> > >> quite a lot. Or is this just a coincidence and there shouldn't be a
> > >> performance difference in merging groupby query results, and the only
> > >> difference could've been by parallelizing the deserialization of
> result
> > >> sequences from sub-queries?
> > >>
> > >> Thanks,
> > >> Jisoo
> > >>
> > >
> >
>

Re: Question on GroupBy query results merging process

Posted by Jisoo Kim <ji...@snap.com.INVALID>.
Hi Jihoon,

Thanks for the reply. So what I ended up doing for merging a list of
serialized result Sequences (which is a byte array) was:

1) Create a stream of  out of the list
2) For each serialized sequence in a list, create a query runner that
deserializes the byte array and returns a Sequence (along with applying
PreComputeManipulatorFn). Now the stream becomes Stream<QueryRunner>
3) Call queryRunnerFactory.mergeRunners() (factory is created from the
injector and given query) on the materialized list of QueryRunner
4) Create a FluentQueryRunner out of 3) and add necessary steps including
mergeResults(), which essentially calls queryToolChest.mergeResults() on
queryRunnerFactory.mergeRunners()

Does my approach look valid or is it something that I shouldn't be doing
for merging query results? Before I changed the merging logic to the above
I encountered a problem with merging sub-query results properly for very
heavy groupBy queries.

I haven't had much chance to read through all the group by query processing
logic, but I am wondering if the improvement that I gained was from
changing logic was mainly from deserializing the sub-query results in
parallel (by calling queryRunnerFactory.mergeRunners() which seems to
enable parallelism), or if it was also benefitting from using
GroupByMergingQueryRunnerV2 that has parallel combining threads enabled.

Thanks,
Jisoo

On Thu, Jul 19, 2018 at 3:06 PM, Jihoon Son <gh...@gmail.com> wrote:

> Hi Jisoo,
>
> sorry, the previous email was sent by accident.
>
> The initial version of groupBy v2 wasn't capable of combining intermediates
> in parallel. Some of our customers met the similar issue to yours, and so I
> was working on improving groupBy v2 performance for a while.
>
> Parallel combining on brokers definitely makes sense. I was thinking to add
> a sort of ParallelMergeSequence which is a parallel version of
> MergeSequence, but it can be anything if it supports parallel combining on
> brokers.
>
> One thing I'm worrying about is, most query processing interfaces in
> brokers are using Sequence, and thus using another stuff for a specific
> query type might make the codes complicated. I think we need to avoid it if
> possible.
>
> Best,
> Jihoon
>
> On Thu, Jul 19, 2018 at 2:58 PM Jihoon Son <gh...@gmail.com> wrote:
>
> > Hi Jisoo,
> >
> > the initial version of groupBy v2
> >
> > On Thu, Jul 19, 2018 at 2:42 PM Jisoo Kim <ji...@snap.com.invalid>
> > wrote:
> >
> >> Hi all,
> >>
> >> I am currently working on a project that uses Druid's QueryRunner and
> >> other
> >> druid-processing classes. It uses Druid's own classes to calculate query
> >> results. I have been testing large GroupBy queries (using v2), and it
> >> seems
> >> like parallel combining threads for GroupBy queries are only enabled on
> >> the
> >> historical level. I think it is only getting called by
> >> GroupByStrategyV2.mergeRunners()
> >> <
> >> https://github.com/apache/incubator-druid/blob/druid-0.
> 12.1/processing/src/main/java/io/druid/query/groupby/
> strategy/GroupByStrategyV2.java#L335
> >> >
> >> which is only called by GroupByQueryRunnerFactory.mergeRunners() on
> >> historicals.
> >>
> >> Are GroupByMergingQueryRunnerV2 and parallel combining threads meant for
> >> computing and merging per-segment results only, or can they also be used
> >> on
> >> the broker level? I changed the logic of my project from calling
> >> queryToolChest.mergeResults() on MergeSequence (created by providing a
> >> list
> >> of per-segment/per-server sequences) to calling
> >> queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners()
> (where
> >> each runner returns a deserialized result sequence), and that seemed to
> >> have reduced really heavy groupby query computation time or failures by
> >> quite a lot. Or is this just a coincidence and there shouldn't be a
> >> performance difference in merging groupby query results, and the only
> >> difference could've been by parallelizing the deserialization of result
> >> sequences from sub-queries?
> >>
> >> Thanks,
> >> Jisoo
> >>
> >
>

Re: Question on GroupBy query results merging process

Posted by Jihoon Son <gh...@gmail.com>.
Hi Jisoo,

sorry, the previous email was sent by accident.

The initial version of groupBy v2 wasn't capable of combining intermediates
in parallel. Some of our customers met the similar issue to yours, and so I
was working on improving groupBy v2 performance for a while.

Parallel combining on brokers definitely makes sense. I was thinking to add
a sort of ParallelMergeSequence which is a parallel version of
MergeSequence, but it can be anything if it supports parallel combining on
brokers.

One thing I'm worrying about is, most query processing interfaces in
brokers are using Sequence, and thus using another stuff for a specific
query type might make the codes complicated. I think we need to avoid it if
possible.

Best,
Jihoon

On Thu, Jul 19, 2018 at 2:58 PM Jihoon Son <gh...@gmail.com> wrote:

> Hi Jisoo,
>
> the initial version of groupBy v2
>
> On Thu, Jul 19, 2018 at 2:42 PM Jisoo Kim <ji...@snap.com.invalid>
> wrote:
>
>> Hi all,
>>
>> I am currently working on a project that uses Druid's QueryRunner and
>> other
>> druid-processing classes. It uses Druid's own classes to calculate query
>> results. I have been testing large GroupBy queries (using v2), and it
>> seems
>> like parallel combining threads for GroupBy queries are only enabled on
>> the
>> historical level. I think it is only getting called by
>> GroupByStrategyV2.mergeRunners()
>> <
>> https://github.com/apache/incubator-druid/blob/druid-0.12.1/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java#L335
>> >
>> which is only called by GroupByQueryRunnerFactory.mergeRunners() on
>> historicals.
>>
>> Are GroupByMergingQueryRunnerV2 and parallel combining threads meant for
>> computing and merging per-segment results only, or can they also be used
>> on
>> the broker level? I changed the logic of my project from calling
>> queryToolChest.mergeResults() on MergeSequence (created by providing a
>> list
>> of per-segment/per-server sequences) to calling
>> queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners() (where
>> each runner returns a deserialized result sequence), and that seemed to
>> have reduced really heavy groupby query computation time or failures by
>> quite a lot. Or is this just a coincidence and there shouldn't be a
>> performance difference in merging groupby query results, and the only
>> difference could've been by parallelizing the deserialization of result
>> sequences from sub-queries?
>>
>> Thanks,
>> Jisoo
>>
>

Re: Question on GroupBy query results merging process

Posted by Jihoon Son <gh...@gmail.com>.
Hi Jisoo,

the initial version of groupBy v2

On Thu, Jul 19, 2018 at 2:42 PM Jisoo Kim <ji...@snap.com.invalid>
wrote:

> Hi all,
>
> I am currently working on a project that uses Druid's QueryRunner and other
> druid-processing classes. It uses Druid's own classes to calculate query
> results. I have been testing large GroupBy queries (using v2), and it seems
> like parallel combining threads for GroupBy queries are only enabled on the
> historical level. I think it is only getting called by
> GroupByStrategyV2.mergeRunners()
> <
> https://github.com/apache/incubator-druid/blob/druid-0.12.1/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java#L335
> >
> which is only called by GroupByQueryRunnerFactory.mergeRunners() on
> historicals.
>
> Are GroupByMergingQueryRunnerV2 and parallel combining threads meant for
> computing and merging per-segment results only, or can they also be used on
> the broker level? I changed the logic of my project from calling
> queryToolChest.mergeResults() on MergeSequence (created by providing a list
> of per-segment/per-server sequences) to calling
> queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners() (where
> each runner returns a deserialized result sequence), and that seemed to
> have reduced really heavy groupby query computation time or failures by
> quite a lot. Or is this just a coincidence and there shouldn't be a
> performance difference in merging groupby query results, and the only
> difference could've been by parallelizing the deserialization of result
> sequences from sub-queries?
>
> Thanks,
> Jisoo
>