You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by Michael Schiff <mi...@apache.org> on 2021/07/23 19:18:16 UTC

ItemsSketch Aggregator in druid-datasketches extension

I am looking into implementing a new Aggregator in the datasketches extension using the ItemSketch in the frequencies package:

https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html
https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/frequencies

Ive started on a partial implementation here (still a WIP, lots of TODOs):
https://github.com/apache/druid/compare/master...michaelschiff:fis-aggregator?expand=1

From everything I've seen, it's critical that there is an efficient implementation of BufferAggregator. The existing aggregators take advantage of other sketch types providing "Direct" implementations that are implemented directly against a ByteBuffer.  This leads to fairly transparent implementation of BufferAggregator.  ItemSketch is able to serialize itself and to wrap ByteBuffer for instantiation, but the actual interactions are all on heap (core of the implementation is https://github.com/apache/datasketches-java/blob/27ecce938555d731f29df97f12f4744a0efb663d/src/main/java/org/apache/datasketches/frequencies/ReversePurgeItemHashMap.java).

Can anyone confirm that it is critical (i.e. Aggregator will not function) to have an implementation of BufferAggregator? Assuming it is, we can begin talking with the datasketches team about the possibility of a Direct implementation.  I am also thinking of finishing the implementation by explicitly serializing the entire sketch on each update, but this would only be for experimentation as I doubt this is the intended behavior for implementations of BufferedAggregator.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org


Re: ItemsSketch Aggregator in druid-datasketches extension

Posted by Jon Malkin <jm...@apache.org>.
It might be worth pointing out to them that we don't have a direct
implementation of a sketch storing a variable-length item.

Unless I'm missing something, we have theta and the original quantiles of
double. Maybe they want to check the speed when re-serializing after each
update?

  jon

On Sun, Jul 25, 2021, 10:33 PM leerho <le...@gmail.com> wrote:

> Team, this is an important thread to be aware of. :)
>
> Lee.
>
> ---------- Forwarded message ---------
> From: Michael Schiff <mi...@apache.org>
> Date: Fri, Jul 23, 2021 at 12:18 PM
> Subject: ItemsSketch Aggregator in druid-datasketches extension
> To: <de...@druid.apache.org>
>
>
> I am looking into implementing a new Aggregator in the datasketches
> extension using the ItemSketch in the frequencies package:
>
> https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html
>
> https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/frequencies
>
> Ive started on a partial implementation here (still a WIP, lots of TODOs):
>
> https://github.com/apache/druid/compare/master...michaelschiff:fis-aggregator?expand=1
>
> From everything I've seen, it's critical that there is an efficient
> implementation of BufferAggregator. The existing aggregators take advantage
> of other sketch types providing "Direct" implementations that are
> implemented directly against a ByteBuffer.  This leads to fairly
> transparent implementation of BufferAggregator.  ItemSketch is able to
> serialize itself and to wrap ByteBuffer for instantiation, but the actual
> interactions are all on heap (core of the implementation is
> https://github.com/apache/datasketches-java/blob/27ecce938555d731f29df97f12f4744a0efb663d/src/main/java/org/apache/datasketches/frequencies/ReversePurgeItemHashMap.java
> ).
>
> Can anyone confirm that it is critical (i.e. Aggregator will not function)
> to have an implementation of BufferAggregator? Assuming it is, we can begin
> talking with the datasketches team about the possibility of a Direct
> implementation.  I am also thinking of finishing the implementation by
> explicitly serializing the entire sketch on each update, but this would
> only be for experimentation as I doubt this is the intended behavior for
> implementations of BufferedAggregator.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
> For additional commands, e-mail: dev-help@druid.apache.org
>
>

Fwd: ItemsSketch Aggregator in druid-datasketches extension

Posted by leerho <le...@gmail.com>.
Team, this is an important thread to be aware of. :)

Lee.

---------- Forwarded message ---------
From: Michael Schiff <mi...@apache.org>
Date: Fri, Jul 23, 2021 at 12:18 PM
Subject: ItemsSketch Aggregator in druid-datasketches extension
To: <de...@druid.apache.org>


I am looking into implementing a new Aggregator in the datasketches
extension using the ItemSketch in the frequencies package:

https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html
https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/frequencies

Ive started on a partial implementation here (still a WIP, lots of TODOs):
https://github.com/apache/druid/compare/master...michaelschiff:fis-aggregator?expand=1

From everything I've seen, it's critical that there is an efficient
implementation of BufferAggregator. The existing aggregators take advantage
of other sketch types providing "Direct" implementations that are
implemented directly against a ByteBuffer.  This leads to fairly
transparent implementation of BufferAggregator.  ItemSketch is able to
serialize itself and to wrap ByteBuffer for instantiation, but the actual
interactions are all on heap (core of the implementation is
https://github.com/apache/datasketches-java/blob/27ecce938555d731f29df97f12f4744a0efb663d/src/main/java/org/apache/datasketches/frequencies/ReversePurgeItemHashMap.java
).

Can anyone confirm that it is critical (i.e. Aggregator will not function)
to have an implementation of BufferAggregator? Assuming it is, we can begin
talking with the datasketches team about the possibility of a Direct
implementation.  I am also thinking of finishing the implementation by
explicitly serializing the entire sketch on each update, but this would
only be for experimentation as I doubt this is the intended behavior for
implementations of BufferedAggregator.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org

Re: ItemsSketch Aggregator in druid-datasketches extension

Posted by leerho <le...@gmail.com>.
We are busy at the moment trying to get new releases of datasketches-memory
followed by a new release of datasketches-java out the door.  This pair of
releases will give us compatibility with JDK 8 through JDK 13.  After that,
we will work with the Druid team to update the datasketches adaptor in
Druid, followed by a Druid release that can take advantage of this new
capability.

In order to enable the REQ sketch for Druid, it will have to be carefully
updated to enable off-heap (Direct) operation.  Then the Druid datasketches
adaptor will need to be updated to accommodate the new, off-heap REQ
sketch.

As you can tell, this is quite a bit of work.  So contributions into this
effort would be greatly appreciated.  Interested?

Cheers,

Lee.

On Wed, Sep 1, 2021 at 11:51 PM David Glasser <gl...@apollographql.com>
wrote:

> On Thu, Aug 12, 2021 at 8:46 PM leerho <le...@gmail.com> wrote:
> > The non-generic container sketches include the Theta sketch, the double
> > valued Tuple sketch, the double valued Quantiles sketch, the float valued
> > KLL and REQ sketches and the long valued FrequentItemsSketch.  Of these,
> > the Theta family and our original Quantiles sketch already have Direct
> > implementations and are already part of Druid.
>
> Out of curiosity, do you know if anyone is working on integrating the
> REQ sketch into Druid?
>
> --dave
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
> For additional commands, e-mail: dev-help@druid.apache.org
>
>

Re: ItemsSketch Aggregator in druid-datasketches extension

Posted by leerho <le...@gmail.com>.
We are busy at the moment trying to get new releases of datasketches-memory
followed by a new release of datasketches-java out the door.  This pair of
releases will give us compatibility with JDK 8 through JDK 13.  After that,
we will work with the Druid team to update the datasketches adaptor in
Druid, followed by a Druid release that can take advantage of this new
capability.

In order to enable the REQ sketch for Druid, it will have to be carefully
updated to enable off-heap (Direct) operation.  Then the Druid datasketches
adaptor will need to be updated to accommodate the new, off-heap REQ
sketch.

As you can tell, this is quite a bit of work.  So contributions into this
effort would be greatly appreciated.  Interested?

Cheers,

Lee.

On Wed, Sep 1, 2021 at 11:51 PM David Glasser <gl...@apollographql.com>
wrote:

> On Thu, Aug 12, 2021 at 8:46 PM leerho <le...@gmail.com> wrote:
> > The non-generic container sketches include the Theta sketch, the double
> > valued Tuple sketch, the double valued Quantiles sketch, the float valued
> > KLL and REQ sketches and the long valued FrequentItemsSketch.  Of these,
> > the Theta family and our original Quantiles sketch already have Direct
> > implementations and are already part of Druid.
>
> Out of curiosity, do you know if anyone is working on integrating the
> REQ sketch into Druid?
>
> --dave
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
> For additional commands, e-mail: dev-help@druid.apache.org
>
>

Re: ItemsSketch Aggregator in druid-datasketches extension

Posted by David Glasser <gl...@apollographql.com>.
On Thu, Aug 12, 2021 at 8:46 PM leerho <le...@gmail.com> wrote:
> The non-generic container sketches include the Theta sketch, the double
> valued Tuple sketch, the double valued Quantiles sketch, the float valued
> KLL and REQ sketches and the long valued FrequentItemsSketch.  Of these,
> the Theta family and our original Quantiles sketch already have Direct
> implementations and are already part of Druid.

Out of curiosity, do you know if anyone is working on integrating the
REQ sketch into Druid?

--dave

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@datasketches.apache.org
For additional commands, e-mail: dev-help@datasketches.apache.org


Re: ItemsSketch Aggregator in druid-datasketches extension

Posted by David Glasser <gl...@apollographql.com>.
On Thu, Aug 12, 2021 at 8:46 PM leerho <le...@gmail.com> wrote:
> The non-generic container sketches include the Theta sketch, the double
> valued Tuple sketch, the double valued Quantiles sketch, the float valued
> KLL and REQ sketches and the long valued FrequentItemsSketch.  Of these,
> the Theta family and our original Quantiles sketch already have Direct
> implementations and are already part of Druid.

Out of curiosity, do you know if anyone is working on integrating the
REQ sketch into Druid?

--dave

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org


Re: ItemsSketch Aggregator in druid-datasketches extension

Posted by leerho <le...@gmail.com>.
Michael, we haven't implemented off-heap ("Direct") versions of our generic
sketches that can contain arbitrary variable-length items, precisely for
some of the reasons that Gian explains.  These are members of what we call
our "generic container" sketches, in that they retain a sample of
user-defined items from the incoming stream within the sketch itself.  The
generic container sketches include versions of our quantile sketch and the
FrequentItemsSketch.  Since we don't know anything about the size of the
user defined items, we cannot predict how big the sketch will be.  The
second problem is that attempting to store Java Objects faithfully in
off-heap memory is messy and expensive, both in time and space.  Yes, it
can be done using JSON, YAML, Thrift, or Protocol buffers, but we haven't
had the resources to work on any of those solutions.

The non-generic container sketches include the Theta sketch, the double
valued Tuple sketch, the double valued Quantiles sketch, the float valued
KLL and REQ sketches and the long valued FrequentItemsSketch.  Of these,
the Theta family and our original Quantiles sketch already have Direct
implementations and are already part of Druid.

Unfortunately, we don't have a Direct, off-heap version of the float-KLL,
float-REQ or the long valued FrequentItemsSketch.

If you are interested in contributing in any of these areas, we would be
interested in working with you.

Cheers,

Lee.



On Fri, Jul 23, 2021 at 2:13 PM Michael Schiff <mi...@apache.org>
wrote:

> Thanks for the detailed answer Gian!  Sounds like the best approach then
> is to pursue a ByteBuffer based implementation of ItemsSketch, and then use
> this for an implementation of ItemSketchBuildBufferAggregator(Helper) that
> does this same reference update if the sketch has to grow.
>
> On 2021/07/23 20:35:52, Gian Merlino <gi...@apache.org> wrote:
> > Hey Michael,
> >
> > Very cool!
> >
> > To answer your question: it is critical to have a BufferAggregator. Some
> > context; there are 3 kinds of aggregators:
> >
> > - Aggregator: stores intermediate state on heap; is used during ingestion
> > and by the non-vectorized timeseries query engine. Required, or else some
> > queries won't work properly.
> > - BufferAggregator: stores intermediate state off-heap; is used by the
> > non-vectorized topN and groupBy engines. Required, or else some queries
> > won't work properly.
> > - VectorAggregator: stores intermediate state off-heap; is used by
> > vectorized query engines. Optional, but if there are any aggregators in a
> > query that don't have a VectorAggregator implementation, then the whole
> > query will run non-vectorized, which will slow it down.
> >
> > The main reason that BufferAggregators exist is to support off-heap
> > aggregation, which minimizes GC pressure and prevents out-of-memory
> errors
> > during the aggregation process.
> >
> > > Assuming it is, we can begin talking with the datasketches team about
> the
> > possibility of a Direct implementation.
> >
> > With ItemsSketch, the biggest roadblock you're likely to run into is the
> > fact that the items may be of variable size. Currently in Druid each
> > BufferAggregator (and VectorAggregator) must have a fixed-size amount of
> > space allocated off heap. The way this is done is by returning the number
> > of bytes you need from AggregatorFactory.getMaxIntermediateSize. It's all
> > allocated upfront. It can depend on aggregator configuration (for
> example,
> > the HLL aggregators need more space if they're configured to have higher
> > accuracy) but it can't depend on the data that is encountered during the
> > query. So if you do have variable size items, you get into quite a
> > conundrum!
> >
> > In order to have a "proper" implementation of an off-heap variable-sized
> > sketch, we'd need an aggregator API that allows them to allocate
> additional
> > memory at runtime. The DataSketches folks have asked us for this before
> but
> > we haven't built it yet. There's a couple ways you could solve it in the
> > meantime:
> >
> > 1) Allocate a fixed-size amount of memory that is enough to store most
> data
> > you reasonably expect to encounter, update that off-heap, and allocate
> > additional memory on heap if needed, without Druid "knowing" about it.
> This
> > is something that'd require a Direct version of the ItemsSketch. It's a
> > technique used by the quantiles sketches too. It's a hack but it works.
> You
> > can see it in action by looking at DirectUpdateDoublesSketch
> > <
> https://github.com/apache/datasketches-java/blob/master/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java
> >
> > in DataSketches (check out the spots where "mem_" is reassigned: it's
> > allocating new on-heap memory that Druid doesn't know about) and
> > DoublesSketchBuildBufferAggregator in Druid (check out the "sketches" map
> > and "relocate" method in DoublesSketchBuildBufferAggregatorHelper: it's
> > making sure to retain the references just in case one of them grew into
> the
> > heap).
> >
> > 2) Allocate some arbitrary amount of memory, but don't use it, use a
> > technique like the one in DoublesSketchBuildBufferAggregatorHelper to
> > simply maintain references to on-heap sketches. This would work but you
> > would run the risk of running out of heap memory. (There's nothing that
> > explicitly controls how much heap you'll use.) To a degree you can
> mitigate
> > this by allocating a larger amount of unused off-heap memory via
> > AggregatorFactory.getMaxIntermediateSize, which will get Druid to flush
> > aggregation state more often (it does this when it runs out of off-heap
> > buffer space), which will sort of limit how many sketches are going to be
> > in flight at once. It's quite indirect but it's the best I can think of.
> >
> > > I am also thinking of finishing the implementation by explicitly
> > serializing the entire sketch on each update, but this would only be for
> > experimentation as I doubt this is the intended behavior for
> > implementations of BufferedAggregator.
> >
> > You're right, that's not the intended sort of implementation. It works
> but
> > it's usually too slow to be practical. (It incurs a full serialization
> > roundtrip for every row.)
> >
> > On Fri, Jul 23, 2021 at 12:18 PM Michael Schiff <
> michaelschiff@apache.org>
> > wrote:
> >
> > > I am looking into implementing a new Aggregator in the datasketches
> > > extension using the ItemSketch in the frequencies package:
> > >
> > >
> https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html
> > >
> > >
> https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/frequencies
> > >
> > > Ive started on a partial implementation here (still a WIP, lots of
> TODOs):
> > >
> > >
> https://github.com/apache/druid/compare/master...michaelschiff:fis-aggregator?expand=1
> > >
> > > From everything I've seen, it's critical that there is an efficient
> > > implementation of BufferAggregator. The existing aggregators take
> advantage
> > > of other sketch types providing "Direct" implementations that are
> > > implemented directly against a ByteBuffer.  This leads to fairly
> > > transparent implementation of BufferAggregator.  ItemSketch is able to
> > > serialize itself and to wrap ByteBuffer for instantiation, but the
> actual
> > > interactions are all on heap (core of the implementation is
> > >
> https://github.com/apache/datasketches-java/blob/27ecce938555d731f29df97f12f4744a0efb663d/src/main/java/org/apache/datasketches/frequencies/ReversePurgeItemHashMap.java
> > > ).
> > >
> > > Can anyone confirm that it is critical (i.e. Aggregator will not
> function)
> > > to have an implementation of BufferAggregator? Assuming it is, we can
> begin
> > > talking with the datasketches team about the possibility of a Direct
> > > implementation.  I am also thinking of finishing the implementation by
> > > explicitly serializing the entire sketch on each update, but this would
> > > only be for experimentation as I doubt this is the intended behavior
> for
> > > implementations of BufferedAggregator.
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
> > > For additional commands, e-mail: dev-help@druid.apache.org
> > >
> > >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
> For additional commands, e-mail: dev-help@druid.apache.org
>
>

Re: ItemsSketch Aggregator in druid-datasketches extension

Posted by leerho <le...@gmail.com>.
Michael, we haven't implemented off-heap ("Direct") versions of our generic
sketches that can contain arbitrary variable-length items, precisely for
some of the reasons that Gian explains.  These are members of what we call
our "generic container" sketches, in that they retain a sample of
user-defined items from the incoming stream within the sketch itself.  The
generic container sketches include versions of our quantile sketch and the
FrequentItemsSketch.  Since we don't know anything about the size of the
user defined items, we cannot predict how big the sketch will be.  The
second problem is that attempting to store Java Objects faithfully in
off-heap memory is messy and expensive, both in time and space.  Yes, it
can be done using JSON, YAML, Thrift, or Protocol buffers, but we haven't
had the resources to work on any of those solutions.

The non-generic container sketches include the Theta sketch, the double
valued Tuple sketch, the double valued Quantiles sketch, the float valued
KLL and REQ sketches and the long valued FrequentItemsSketch.  Of these,
the Theta family and our original Quantiles sketch already have Direct
implementations and are already part of Druid.

Unfortunately, we don't have a Direct, off-heap version of the float-KLL,
float-REQ or the long valued FrequentItemsSketch.

If you are interested in contributing in any of these areas, we would be
interested in working with you.

Cheers,

Lee.



On Fri, Jul 23, 2021 at 2:13 PM Michael Schiff <mi...@apache.org>
wrote:

> Thanks for the detailed answer Gian!  Sounds like the best approach then
> is to pursue a ByteBuffer based implementation of ItemsSketch, and then use
> this for an implementation of ItemSketchBuildBufferAggregator(Helper) that
> does this same reference update if the sketch has to grow.
>
> On 2021/07/23 20:35:52, Gian Merlino <gi...@apache.org> wrote:
> > Hey Michael,
> >
> > Very cool!
> >
> > To answer your question: it is critical to have a BufferAggregator. Some
> > context; there are 3 kinds of aggregators:
> >
> > - Aggregator: stores intermediate state on heap; is used during ingestion
> > and by the non-vectorized timeseries query engine. Required, or else some
> > queries won't work properly.
> > - BufferAggregator: stores intermediate state off-heap; is used by the
> > non-vectorized topN and groupBy engines. Required, or else some queries
> > won't work properly.
> > - VectorAggregator: stores intermediate state off-heap; is used by
> > vectorized query engines. Optional, but if there are any aggregators in a
> > query that don't have a VectorAggregator implementation, then the whole
> > query will run non-vectorized, which will slow it down.
> >
> > The main reason that BufferAggregators exist is to support off-heap
> > aggregation, which minimizes GC pressure and prevents out-of-memory
> errors
> > during the aggregation process.
> >
> > > Assuming it is, we can begin talking with the datasketches team about
> the
> > possibility of a Direct implementation.
> >
> > With ItemsSketch, the biggest roadblock you're likely to run into is the
> > fact that the items may be of variable size. Currently in Druid each
> > BufferAggregator (and VectorAggregator) must have a fixed-size amount of
> > space allocated off heap. The way this is done is by returning the number
> > of bytes you need from AggregatorFactory.getMaxIntermediateSize. It's all
> > allocated upfront. It can depend on aggregator configuration (for
> example,
> > the HLL aggregators need more space if they're configured to have higher
> > accuracy) but it can't depend on the data that is encountered during the
> > query. So if you do have variable size items, you get into quite a
> > conundrum!
> >
> > In order to have a "proper" implementation of an off-heap variable-sized
> > sketch, we'd need an aggregator API that allows them to allocate
> additional
> > memory at runtime. The DataSketches folks have asked us for this before
> but
> > we haven't built it yet. There's a couple ways you could solve it in the
> > meantime:
> >
> > 1) Allocate a fixed-size amount of memory that is enough to store most
> data
> > you reasonably expect to encounter, update that off-heap, and allocate
> > additional memory on heap if needed, without Druid "knowing" about it.
> This
> > is something that'd require a Direct version of the ItemsSketch. It's a
> > technique used by the quantiles sketches too. It's a hack but it works.
> You
> > can see it in action by looking at DirectUpdateDoublesSketch
> > <
> https://github.com/apache/datasketches-java/blob/master/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java
> >
> > in DataSketches (check out the spots where "mem_" is reassigned: it's
> > allocating new on-heap memory that Druid doesn't know about) and
> > DoublesSketchBuildBufferAggregator in Druid (check out the "sketches" map
> > and "relocate" method in DoublesSketchBuildBufferAggregatorHelper: it's
> > making sure to retain the references just in case one of them grew into
> the
> > heap).
> >
> > 2) Allocate some arbitrary amount of memory, but don't use it, use a
> > technique like the one in DoublesSketchBuildBufferAggregatorHelper to
> > simply maintain references to on-heap sketches. This would work but you
> > would run the risk of running out of heap memory. (There's nothing that
> > explicitly controls how much heap you'll use.) To a degree you can
> mitigate
> > this by allocating a larger amount of unused off-heap memory via
> > AggregatorFactory.getMaxIntermediateSize, which will get Druid to flush
> > aggregation state more often (it does this when it runs out of off-heap
> > buffer space), which will sort of limit how many sketches are going to be
> > in flight at once. It's quite indirect but it's the best I can think of.
> >
> > > I am also thinking of finishing the implementation by explicitly
> > serializing the entire sketch on each update, but this would only be for
> > experimentation as I doubt this is the intended behavior for
> > implementations of BufferedAggregator.
> >
> > You're right, that's not the intended sort of implementation. It works
> but
> > it's usually too slow to be practical. (It incurs a full serialization
> > roundtrip for every row.)
> >
> > On Fri, Jul 23, 2021 at 12:18 PM Michael Schiff <
> michaelschiff@apache.org>
> > wrote:
> >
> > > I am looking into implementing a new Aggregator in the datasketches
> > > extension using the ItemSketch in the frequencies package:
> > >
> > >
> https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html
> > >
> > >
> https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/frequencies
> > >
> > > Ive started on a partial implementation here (still a WIP, lots of
> TODOs):
> > >
> > >
> https://github.com/apache/druid/compare/master...michaelschiff:fis-aggregator?expand=1
> > >
> > > From everything I've seen, it's critical that there is an efficient
> > > implementation of BufferAggregator. The existing aggregators take
> advantage
> > > of other sketch types providing "Direct" implementations that are
> > > implemented directly against a ByteBuffer.  This leads to fairly
> > > transparent implementation of BufferAggregator.  ItemSketch is able to
> > > serialize itself and to wrap ByteBuffer for instantiation, but the
> actual
> > > interactions are all on heap (core of the implementation is
> > >
> https://github.com/apache/datasketches-java/blob/27ecce938555d731f29df97f12f4744a0efb663d/src/main/java/org/apache/datasketches/frequencies/ReversePurgeItemHashMap.java
> > > ).
> > >
> > > Can anyone confirm that it is critical (i.e. Aggregator will not
> function)
> > > to have an implementation of BufferAggregator? Assuming it is, we can
> begin
> > > talking with the datasketches team about the possibility of a Direct
> > > implementation.  I am also thinking of finishing the implementation by
> > > explicitly serializing the entire sketch on each update, but this would
> > > only be for experimentation as I doubt this is the intended behavior
> for
> > > implementations of BufferedAggregator.
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
> > > For additional commands, e-mail: dev-help@druid.apache.org
> > >
> > >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
> For additional commands, e-mail: dev-help@druid.apache.org
>
>

Re: ItemsSketch Aggregator in druid-datasketches extension

Posted by Michael Schiff <mi...@apache.org>.
Thanks for the detailed answer Gian!  Sounds like the best approach then is to pursue a ByteBuffer based implementation of ItemsSketch, and then use this for an implementation of ItemSketchBuildBufferAggregator(Helper) that does this same reference update if the sketch has to grow.

On 2021/07/23 20:35:52, Gian Merlino <gi...@apache.org> wrote: 
> Hey Michael,
> 
> Very cool!
> 
> To answer your question: it is critical to have a BufferAggregator. Some
> context; there are 3 kinds of aggregators:
> 
> - Aggregator: stores intermediate state on heap; is used during ingestion
> and by the non-vectorized timeseries query engine. Required, or else some
> queries won't work properly.
> - BufferAggregator: stores intermediate state off-heap; is used by the
> non-vectorized topN and groupBy engines. Required, or else some queries
> won't work properly.
> - VectorAggregator: stores intermediate state off-heap; is used by
> vectorized query engines. Optional, but if there are any aggregators in a
> query that don't have a VectorAggregator implementation, then the whole
> query will run non-vectorized, which will slow it down.
> 
> The main reason that BufferAggregators exist is to support off-heap
> aggregation, which minimizes GC pressure and prevents out-of-memory errors
> during the aggregation process.
> 
> > Assuming it is, we can begin talking with the datasketches team about the
> possibility of a Direct implementation.
> 
> With ItemsSketch, the biggest roadblock you're likely to run into is the
> fact that the items may be of variable size. Currently in Druid each
> BufferAggregator (and VectorAggregator) must have a fixed-size amount of
> space allocated off heap. The way this is done is by returning the number
> of bytes you need from AggregatorFactory.getMaxIntermediateSize. It's all
> allocated upfront. It can depend on aggregator configuration (for example,
> the HLL aggregators need more space if they're configured to have higher
> accuracy) but it can't depend on the data that is encountered during the
> query. So if you do have variable size items, you get into quite a
> conundrum!
> 
> In order to have a "proper" implementation of an off-heap variable-sized
> sketch, we'd need an aggregator API that allows them to allocate additional
> memory at runtime. The DataSketches folks have asked us for this before but
> we haven't built it yet. There's a couple ways you could solve it in the
> meantime:
> 
> 1) Allocate a fixed-size amount of memory that is enough to store most data
> you reasonably expect to encounter, update that off-heap, and allocate
> additional memory on heap if needed, without Druid "knowing" about it. This
> is something that'd require a Direct version of the ItemsSketch. It's a
> technique used by the quantiles sketches too. It's a hack but it works. You
> can see it in action by looking at DirectUpdateDoublesSketch
> <https://github.com/apache/datasketches-java/blob/master/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java>
> in DataSketches (check out the spots where "mem_" is reassigned: it's
> allocating new on-heap memory that Druid doesn't know about) and
> DoublesSketchBuildBufferAggregator in Druid (check out the "sketches" map
> and "relocate" method in DoublesSketchBuildBufferAggregatorHelper: it's
> making sure to retain the references just in case one of them grew into the
> heap).
> 
> 2) Allocate some arbitrary amount of memory, but don't use it, use a
> technique like the one in DoublesSketchBuildBufferAggregatorHelper to
> simply maintain references to on-heap sketches. This would work but you
> would run the risk of running out of heap memory. (There's nothing that
> explicitly controls how much heap you'll use.) To a degree you can mitigate
> this by allocating a larger amount of unused off-heap memory via
> AggregatorFactory.getMaxIntermediateSize, which will get Druid to flush
> aggregation state more often (it does this when it runs out of off-heap
> buffer space), which will sort of limit how many sketches are going to be
> in flight at once. It's quite indirect but it's the best I can think of.
> 
> > I am also thinking of finishing the implementation by explicitly
> serializing the entire sketch on each update, but this would only be for
> experimentation as I doubt this is the intended behavior for
> implementations of BufferedAggregator.
> 
> You're right, that's not the intended sort of implementation. It works but
> it's usually too slow to be practical. (It incurs a full serialization
> roundtrip for every row.)
> 
> On Fri, Jul 23, 2021 at 12:18 PM Michael Schiff <mi...@apache.org>
> wrote:
> 
> > I am looking into implementing a new Aggregator in the datasketches
> > extension using the ItemSketch in the frequencies package:
> >
> > https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html
> >
> > https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/frequencies
> >
> > Ive started on a partial implementation here (still a WIP, lots of TODOs):
> >
> > https://github.com/apache/druid/compare/master...michaelschiff:fis-aggregator?expand=1
> >
> > From everything I've seen, it's critical that there is an efficient
> > implementation of BufferAggregator. The existing aggregators take advantage
> > of other sketch types providing "Direct" implementations that are
> > implemented directly against a ByteBuffer.  This leads to fairly
> > transparent implementation of BufferAggregator.  ItemSketch is able to
> > serialize itself and to wrap ByteBuffer for instantiation, but the actual
> > interactions are all on heap (core of the implementation is
> > https://github.com/apache/datasketches-java/blob/27ecce938555d731f29df97f12f4744a0efb663d/src/main/java/org/apache/datasketches/frequencies/ReversePurgeItemHashMap.java
> > ).
> >
> > Can anyone confirm that it is critical (i.e. Aggregator will not function)
> > to have an implementation of BufferAggregator? Assuming it is, we can begin
> > talking with the datasketches team about the possibility of a Direct
> > implementation.  I am also thinking of finishing the implementation by
> > explicitly serializing the entire sketch on each update, but this would
> > only be for experimentation as I doubt this is the intended behavior for
> > implementations of BufferedAggregator.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
> > For additional commands, e-mail: dev-help@druid.apache.org
> >
> >
> 

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org


Re: ItemsSketch Aggregator in druid-datasketches extension

Posted by Gian Merlino <gi...@apache.org>.
Btw, one additional comment on suggestion (2) above. It's also a hack, of
course, and it's more of a hack than (1) is. At least (1) is using the
proper off-heap storage almost all of the time, which means there is not
much risk of using excessive heap memory. Approach (2) has a higher risk of
using too much heap memory. The only advantage (2) has is that you don't
need a Direct version of the ItemsSketch for it to work.

On Fri, Jul 23, 2021 at 1:35 PM Gian Merlino <gi...@apache.org> wrote:

> Hey Michael,
>
> Very cool!
>
> To answer your question: it is critical to have a BufferAggregator. Some
> context; there are 3 kinds of aggregators:
>
> - Aggregator: stores intermediate state on heap; is used during ingestion
> and by the non-vectorized timeseries query engine. Required, or else some
> queries won't work properly.
> - BufferAggregator: stores intermediate state off-heap; is used by the
> non-vectorized topN and groupBy engines. Required, or else some queries
> won't work properly.
> - VectorAggregator: stores intermediate state off-heap; is used by
> vectorized query engines. Optional, but if there are any aggregators in a
> query that don't have a VectorAggregator implementation, then the whole
> query will run non-vectorized, which will slow it down.
>
> The main reason that BufferAggregators exist is to support off-heap
> aggregation, which minimizes GC pressure and prevents out-of-memory errors
> during the aggregation process.
>
> > Assuming it is, we can begin talking with the datasketches team about
> the possibility of a Direct implementation.
>
> With ItemsSketch, the biggest roadblock you're likely to run into is the
> fact that the items may be of variable size. Currently in Druid each
> BufferAggregator (and VectorAggregator) must have a fixed-size amount of
> space allocated off heap. The way this is done is by returning the number
> of bytes you need from AggregatorFactory.getMaxIntermediateSize. It's all
> allocated upfront. It can depend on aggregator configuration (for example,
> the HLL aggregators need more space if they're configured to have higher
> accuracy) but it can't depend on the data that is encountered during the
> query. So if you do have variable size items, you get into quite a
> conundrum!
>
> In order to have a "proper" implementation of an off-heap variable-sized
> sketch, we'd need an aggregator API that allows them to allocate additional
> memory at runtime. The DataSketches folks have asked us for this before but
> we haven't built it yet. There's a couple ways you could solve it in the
> meantime:
>
> 1) Allocate a fixed-size amount of memory that is enough to store most
> data you reasonably expect to encounter, update that off-heap, and allocate
> additional memory on heap if needed, without Druid "knowing" about it. This
> is something that'd require a Direct version of the ItemsSketch. It's a
> technique used by the quantiles sketches too. It's a hack but it works. You
> can see it in action by looking at DirectUpdateDoublesSketch
> <https://github.com/apache/datasketches-java/blob/master/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java>
> in DataSketches (check out the spots where "mem_" is reassigned: it's
> allocating new on-heap memory that Druid doesn't know about) and
> DoublesSketchBuildBufferAggregator in Druid (check out the "sketches" map
> and "relocate" method in DoublesSketchBuildBufferAggregatorHelper: it's
> making sure to retain the references just in case one of them grew into the
> heap).
>
> 2) Allocate some arbitrary amount of memory, but don't use it, use a
> technique like the one in DoublesSketchBuildBufferAggregatorHelper to
> simply maintain references to on-heap sketches. This would work but you
> would run the risk of running out of heap memory. (There's nothing that
> explicitly controls how much heap you'll use.) To a degree you can mitigate
> this by allocating a larger amount of unused off-heap memory via
> AggregatorFactory.getMaxIntermediateSize, which will get Druid to flush
> aggregation state more often (it does this when it runs out of off-heap
> buffer space), which will sort of limit how many sketches are going to be
> in flight at once. It's quite indirect but it's the best I can think of.
>
> > I am also thinking of finishing the implementation by explicitly
> serializing the entire sketch on each update, but this would only be for
> experimentation as I doubt this is the intended behavior for
> implementations of BufferedAggregator.
>
> You're right, that's not the intended sort of implementation. It works but
> it's usually too slow to be practical. (It incurs a full serialization
> roundtrip for every row.)
>
> On Fri, Jul 23, 2021 at 12:18 PM Michael Schiff <mi...@apache.org>
> wrote:
>
>> I am looking into implementing a new Aggregator in the datasketches
>> extension using the ItemSketch in the frequencies package:
>>
>> https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html
>>
>> https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/frequencies
>>
>> Ive started on a partial implementation here (still a WIP, lots of TODOs):
>>
>> https://github.com/apache/druid/compare/master...michaelschiff:fis-aggregator?expand=1
>>
>> From everything I've seen, it's critical that there is an efficient
>> implementation of BufferAggregator. The existing aggregators take advantage
>> of other sketch types providing "Direct" implementations that are
>> implemented directly against a ByteBuffer.  This leads to fairly
>> transparent implementation of BufferAggregator.  ItemSketch is able to
>> serialize itself and to wrap ByteBuffer for instantiation, but the actual
>> interactions are all on heap (core of the implementation is
>> https://github.com/apache/datasketches-java/blob/27ecce938555d731f29df97f12f4744a0efb663d/src/main/java/org/apache/datasketches/frequencies/ReversePurgeItemHashMap.java
>> ).
>>
>> Can anyone confirm that it is critical (i.e. Aggregator will not
>> function) to have an implementation of BufferAggregator? Assuming it is, we
>> can begin talking with the datasketches team about the possibility of a
>> Direct implementation.  I am also thinking of finishing the implementation
>> by explicitly serializing the entire sketch on each update, but this would
>> only be for experimentation as I doubt this is the intended behavior for
>> implementations of BufferedAggregator.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
>> For additional commands, e-mail: dev-help@druid.apache.org
>>
>>

Re: ItemsSketch Aggregator in druid-datasketches extension

Posted by Gian Merlino <gi...@apache.org>.
Hey Michael,

Very cool!

To answer your question: it is critical to have a BufferAggregator. Some
context; there are 3 kinds of aggregators:

- Aggregator: stores intermediate state on heap; is used during ingestion
and by the non-vectorized timeseries query engine. Required, or else some
queries won't work properly.
- BufferAggregator: stores intermediate state off-heap; is used by the
non-vectorized topN and groupBy engines. Required, or else some queries
won't work properly.
- VectorAggregator: stores intermediate state off-heap; is used by
vectorized query engines. Optional, but if there are any aggregators in a
query that don't have a VectorAggregator implementation, then the whole
query will run non-vectorized, which will slow it down.

The main reason that BufferAggregators exist is to support off-heap
aggregation, which minimizes GC pressure and prevents out-of-memory errors
during the aggregation process.

> Assuming it is, we can begin talking with the datasketches team about the
possibility of a Direct implementation.

With ItemsSketch, the biggest roadblock you're likely to run into is the
fact that the items may be of variable size. Currently in Druid each
BufferAggregator (and VectorAggregator) must have a fixed-size amount of
space allocated off heap. The way this is done is by returning the number
of bytes you need from AggregatorFactory.getMaxIntermediateSize. It's all
allocated upfront. It can depend on aggregator configuration (for example,
the HLL aggregators need more space if they're configured to have higher
accuracy) but it can't depend on the data that is encountered during the
query. So if you do have variable size items, you get into quite a
conundrum!

In order to have a "proper" implementation of an off-heap variable-sized
sketch, we'd need an aggregator API that allows them to allocate additional
memory at runtime. The DataSketches folks have asked us for this before but
we haven't built it yet. There's a couple ways you could solve it in the
meantime:

1) Allocate a fixed-size amount of memory that is enough to store most data
you reasonably expect to encounter, update that off-heap, and allocate
additional memory on heap if needed, without Druid "knowing" about it. This
is something that'd require a Direct version of the ItemsSketch. It's a
technique used by the quantiles sketches too. It's a hack but it works. You
can see it in action by looking at DirectUpdateDoublesSketch
<https://github.com/apache/datasketches-java/blob/master/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java>
in DataSketches (check out the spots where "mem_" is reassigned: it's
allocating new on-heap memory that Druid doesn't know about) and
DoublesSketchBuildBufferAggregator in Druid (check out the "sketches" map
and "relocate" method in DoublesSketchBuildBufferAggregatorHelper: it's
making sure to retain the references just in case one of them grew into the
heap).

2) Allocate some arbitrary amount of memory, but don't use it, use a
technique like the one in DoublesSketchBuildBufferAggregatorHelper to
simply maintain references to on-heap sketches. This would work but you
would run the risk of running out of heap memory. (There's nothing that
explicitly controls how much heap you'll use.) To a degree you can mitigate
this by allocating a larger amount of unused off-heap memory via
AggregatorFactory.getMaxIntermediateSize, which will get Druid to flush
aggregation state more often (it does this when it runs out of off-heap
buffer space), which will sort of limit how many sketches are going to be
in flight at once. It's quite indirect but it's the best I can think of.

> I am also thinking of finishing the implementation by explicitly
serializing the entire sketch on each update, but this would only be for
experimentation as I doubt this is the intended behavior for
implementations of BufferedAggregator.

You're right, that's not the intended sort of implementation. It works but
it's usually too slow to be practical. (It incurs a full serialization
roundtrip for every row.)

On Fri, Jul 23, 2021 at 12:18 PM Michael Schiff <mi...@apache.org>
wrote:

> I am looking into implementing a new Aggregator in the datasketches
> extension using the ItemSketch in the frequencies package:
>
> https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html
>
> https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/frequencies
>
> Ive started on a partial implementation here (still a WIP, lots of TODOs):
>
> https://github.com/apache/druid/compare/master...michaelschiff:fis-aggregator?expand=1
>
> From everything I've seen, it's critical that there is an efficient
> implementation of BufferAggregator. The existing aggregators take advantage
> of other sketch types providing "Direct" implementations that are
> implemented directly against a ByteBuffer.  This leads to fairly
> transparent implementation of BufferAggregator.  ItemSketch is able to
> serialize itself and to wrap ByteBuffer for instantiation, but the actual
> interactions are all on heap (core of the implementation is
> https://github.com/apache/datasketches-java/blob/27ecce938555d731f29df97f12f4744a0efb663d/src/main/java/org/apache/datasketches/frequencies/ReversePurgeItemHashMap.java
> ).
>
> Can anyone confirm that it is critical (i.e. Aggregator will not function)
> to have an implementation of BufferAggregator? Assuming it is, we can begin
> talking with the datasketches team about the possibility of a Direct
> implementation.  I am also thinking of finishing the implementation by
> explicitly serializing the entire sketch on each update, but this would
> only be for experimentation as I doubt this is the intended behavior for
> implementations of BufferedAggregator.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
> For additional commands, e-mail: dev-help@druid.apache.org
>
>