You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by Eshcar Hillel <es...@oath.com.INVALID> on 2018/07/10 09:09:54 UTC

Question about sketches aggregation in druid

Hi All,
My name is Eshcar Hillel from Oath research. I'm currently working with Lee Rhodes on committing a new concurrent implementation of the theta sketch to the sketches-core library.I was wondering whether this implementation can help boost the union operation that is applied to multiple sketches at query time in druid.From what I see in the code the sketch aggregator uses the SynchronizedUnion implementation, which basically uses a lock at every single access (update/read) of the union operation. We believe a thread-safe implementation of the union operation can help decrease the inherent overhead of the lock.
I will be happy to join the meeting today and briefly discuss this option.
Thanks,Eshcar   



Re: Question about sketches aggregation in druid

Posted by Eshcar Hillel <es...@oath.com.INVALID>.
 Thanks Himanshu!
I will update when we have the ConcurrentUnion in the DataSketches library, or earlier if we get interesting performance results with the union implementations.
    On Tuesday, July 24, 2018, 8:39:25 PM GMT+3, Himanshu <g....@gmail.com> wrote:  
 
 This came up in the dev sync today.

Here is the gist.

- Union is necessary because we merge sketches in multiple cases e.g. at
query time, while persisting the final segment to be pushed to deep storage
, indexing user's data that itself contains sketches (e.g. someone ran
batch pipelines with Pig etc and created data that already has sketches in
it).
- SynchronizedUnion is necessary only because of realtime indexing and
querying use case as Gian mentioned ( it is a single writer - multiple
readers use case). If we have a ConcurrentUnion implementation that
performs as good as non-thread-safe Union for single thread, then we should
totally be able to remove SynchronizedUnion.

-- Himanshu

On Sun, Jul 22, 2018 at 9:10 AM, Eshcar Hillel <es...@oath.com.invalid>
wrote:

>  I think part of my confusion stems from the gap in the level of
> abstraction.Druid terminology focuses on aggregation.But in sketches there
> are two levels of aggregation:A sketch is a first level aggregation which
> holds the gist of the stream,
> A union is a second level aggregation which can aggregate sketches.
> Adding 2 questions to the questions below:
> The locks are used to synchronize the access to the union - I assume the
> union is a second level aggregation merging sketches that are built during
> ingestion.
> 3) If this is not the case then why does druid apply a union and not
> simply uses a sketch to aggregate the data?4) if it is the case then is it
> guaranteed that the merged sketches are immutable? otherwise wrapping the
> union with locks is not enough.
>
> I hope my questions make more sense now.
> Thanks,Eshcar
>
>    On Sunday, July 22, 2018, 4:15:02 PM GMT+3, Eshcar Hillel <
> eshcar@oath.com> wrote:
>
>  Thanks Gian - I was missing the part about aggregation during ingestion
> time roll-up.
> I looked at the SketchAggregator code and read the druid overview document
> let me verify that I got this right.Consider the best effort roll up mode:
> as events arrive they are ingested into multiple segments, call these
> s0...s9, but should belong to a single segment.Then a roll-up process ru
> aggregates s0...s9 one-by-one (?) into a single segment. During the roll up
> ru can be queried and therefore needs to be thread safe.
> 1) who is the "owner" of the roll up process? what triggers the roll-up
> thread? Is it considered as part of the ingestion/indexing time, or is it
> done at the background as a kind of an optimization?
>
> 2) The documents says "Data is queryable as soon as it isingested by the
> realtime processing logic." Does this means that queries can apply get to
> s0..s9? should they be thread safe as well?
>
>
>    On Thursday, July 19, 2018, 10:16:34 PM GMT+3, Gian Merlino <
> gian@apache.org> wrote:
>
>  Hi Eshcar,
>
> I don't think I 100% understand what you are asking, but I will say some
> things, and hopefully they will be helpful.
>
> In Druid we use aggregators for two things: aggregation during ingestion
> (for ingestion-time rollup) and aggregation during queries. During queries
> the aggregators are only ever used by one thread at a time. At ingestion
> time, "aggregate" and "get" can be called simultaneously. It happens
> because "aggregate" is called from an ingestion thread (because we update
> running aggregators during ingestion), and "get" is called by query threads
> (because they "get" those aggregator values from the ingestion aggregator
> object to feed them to a query aggregator object). These calls are not
> synchronized by Druid, so individual aggregators need to do it themselves
> if necessary. There was some effort to address this systematically:
> https://github.com/apache/incubator-druid/pull/3956, although it hasn't
> been finished yet. Check out some of the discussion on that patch for more
> background, and a question I just posted there: does it make more sense for
> ingestion-time aggregator thread-safety to be handled systematically (at
> the IncrementalIndex) or for each aggregator to need to be thread safe?
>
> If you're looking at "aggregate" and "get" in this file, those are the two
> that could get called simultaneously:
> https://github.com/apache/incubator-druid/blob/master/
> extensions-core/datasketches/src/main/java/io/druid/query/
> aggregation/datasketches/theta/SketchAggregator.java
>
> On Sun, Jul 15, 2018 at 12:11 AM Eshcar Hillel <es...@oath.com.invalid>
> wrote:
>
> >  Apologies, I must be missing something very basic in how incremental
> > indexing is working.A sketch is by itself an aggregator - it can absorb
> > millions of updates before it exceeds its space limit or is flushed to
> disk.
> > I assumed the ingestion thread aggregates data in multiple sketches in
> > parallel, then at query time a union operation is invoked to merge
> relevant
> > sketches based on the attributes of the query, and when the union is
> > completed its result is returned to the user. But in such scenario there
> is
> > no need to call get before the union is completed.
> > This means there is another scenario where union is used and can be
> > queried while in the process of executing the merge. Is this to maintain
> > some in-memory hierarchy of aggregations? or for creating the snapshots
> > that are flushed to disk?
> > A better understanding of the use case will help in presenting a better
> > thread-safe solution.
> > Thanks,Eshcar
> >
> >    On Wednesday, July 11, 2018, 7:51:24 PM GMT+3, Gian Merlino <
> > gian@apache.org> wrote:
> >
> >  Hi Eshcar,
> >
> > > But even in a single-writer-single-reader scenario removing the lock
> can
> > increase the throughput of accesses to the object.
> >
> > Definitely worth trying this out, imo.
> >
> > > However, I don't understand why is the union object read before the
> > result is ready.
> >
> > It's used as part of incremental indexing: the idea is that we create
> > aggregates during ingestion time and we want those to be queryable even
> > while ingestion is still ongoing. So the ingestion thread will be calling
> > "aggregate" and a query thread will be calling "get" potentially
> > simultaneously.
> >
> > On Wed, Jul 11, 2018 at 1:04 AM Eshcar Hillel <es...@oath.com.invalid>
> > wrote:
> >
> > >  Thanks Gian,
> > > This is also my understanding.But even in a single-writer-single-reader
> > > scenario removing the lock can increase the throughput of accesses to
> the
> > > object.
> > > If the union is only used to produce the result at query time then
> > > removing the lock would not affect ingestion throughput, but could
> > decrease
> > > query latency.However, I don't understand why is the union object read
> > > before the result is ready.
> > >    On Tuesday, July 10, 2018, 8:13:36 PM GMT+3, Gian Merlino <
> > > gian@apache.org> wrote:
> > >
> > >  Hi Eshcar,
> > >
> > > To my knowledge, in the Druid Aggregator and BufferAggregator
> interfaces,
> > > the main place where concurrency happens is that "aggregate" and "get"
> > may
> > > be called simultaneously during realtime ingestion. So if there would
> be
> > a
> > > benefit from improving concurrency it would probably end up in that
> area.
> > >
> > > On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel <eshcar@oath.com.invalid
> >
> > > wrote:
> > >
> > > > Hi All,
> > > > My name is Eshcar Hillel from Oath research. I'm currently working
> with
> > > > Lee Rhodes on committing a new concurrent implementation of the theta
> > > > sketch to the sketches-core library.I was wondering whether this
> > > > implementation can help boost the union operation that is applied to
> > > > multiple sketches at query time in druid.From what I see in the code
> > the
> > > > sketch aggregator uses the SynchronizedUnion implementation, which
> > > > basically uses a lock at every single access (update/read) of the
> union
> > > > operation. We believe a thread-safe implementation of the union
> > operation
> > > > can help decrease the inherent overhead of the lock.
> > > > I will be happy to join the meeting today and briefly discuss this
> > > option.
> > > > Thanks,Eshcar
> > > >
> > > >
> > > >
> > >
> >
>
>
  

Re: Question about sketches aggregation in druid

Posted by Himanshu <g....@gmail.com>.
This came up in the dev sync today.

Here is the gist.

- Union is necessary because we merge sketches in multiple cases e.g. at
query time, while persisting the final segment to be pushed to deep storage
, indexing user's data that itself contains sketches (e.g. someone ran
batch pipelines with Pig etc and created data that already has sketches in
it).
- SynchronizedUnion is necessary only because of realtime indexing and
querying use case as Gian mentioned ( it is a single writer - multiple
readers use case). If we have a ConcurrentUnion implementation that
performs as good as non-thread-safe Union for single thread, then we should
totally be able to remove SynchronizedUnion.

-- Himanshu

On Sun, Jul 22, 2018 at 9:10 AM, Eshcar Hillel <es...@oath.com.invalid>
wrote:

>  I think part of my confusion stems from the gap in the level of
> abstraction.Druid terminology focuses on aggregation.But in sketches there
> are two levels of aggregation:A sketch is a first level aggregation which
> holds the gist of the stream,
> A union is a second level aggregation which can aggregate sketches.
> Adding 2 questions to the questions below:
> The locks are used to synchronize the access to the union - I assume the
> union is a second level aggregation merging sketches that are built during
> ingestion.
> 3) If this is not the case then why does druid apply a union and not
> simply uses a sketch to aggregate the data?4) if it is the case then is it
> guaranteed that the merged sketches are immutable? otherwise wrapping the
> union with locks is not enough.
>
> I hope my questions make more sense now.
> Thanks,Eshcar
>
>     On Sunday, July 22, 2018, 4:15:02 PM GMT+3, Eshcar Hillel <
> eshcar@oath.com> wrote:
>
>   Thanks Gian - I was missing the part about aggregation during ingestion
> time roll-up.
> I looked at the SketchAggregator code and read the druid overview document
> let me verify that I got this right.Consider the best effort roll up mode:
> as events arrive they are ingested into multiple segments, call these
> s0...s9, but should belong to a single segment.Then a roll-up process ru
> aggregates s0...s9 one-by-one (?) into a single segment. During the roll up
> ru can be queried and therefore needs to be thread safe.
> 1) who is the "owner" of the roll up process? what triggers the roll-up
> thread? Is it considered as part of the ingestion/indexing time, or is it
> done at the background as a kind of an optimization?
>
> 2) The documents says "Data is queryable as soon as it isingested by the
> realtime processing logic." Does this means that queries can apply get to
> s0..s9? should they be thread safe as well?
>
>
>     On Thursday, July 19, 2018, 10:16:34 PM GMT+3, Gian Merlino <
> gian@apache.org> wrote:
>
>  Hi Eshcar,
>
> I don't think I 100% understand what you are asking, but I will say some
> things, and hopefully they will be helpful.
>
> In Druid we use aggregators for two things: aggregation during ingestion
> (for ingestion-time rollup) and aggregation during queries. During queries
> the aggregators are only ever used by one thread at a time. At ingestion
> time, "aggregate" and "get" can be called simultaneously. It happens
> because "aggregate" is called from an ingestion thread (because we update
> running aggregators during ingestion), and "get" is called by query threads
> (because they "get" those aggregator values from the ingestion aggregator
> object to feed them to a query aggregator object). These calls are not
> synchronized by Druid, so individual aggregators need to do it themselves
> if necessary. There was some effort to address this systematically:
> https://github.com/apache/incubator-druid/pull/3956, although it hasn't
> been finished yet. Check out some of the discussion on that patch for more
> background, and a question I just posted there: does it make more sense for
> ingestion-time aggregator thread-safety to be handled systematically (at
> the IncrementalIndex) or for each aggregator to need to be thread safe?
>
> If you're looking at "aggregate" and "get" in this file, those are the two
> that could get called simultaneously:
> https://github.com/apache/incubator-druid/blob/master/
> extensions-core/datasketches/src/main/java/io/druid/query/
> aggregation/datasketches/theta/SketchAggregator.java
>
> On Sun, Jul 15, 2018 at 12:11 AM Eshcar Hillel <es...@oath.com.invalid>
> wrote:
>
> >  Apologies, I must be missing something very basic in how incremental
> > indexing is working.A sketch is by itself an aggregator - it can absorb
> > millions of updates before it exceeds its space limit or is flushed to
> disk.
> > I assumed the ingestion thread aggregates data in multiple sketches in
> > parallel, then at query time a union operation is invoked to merge
> relevant
> > sketches based on the attributes of the query, and when the union is
> > completed its result is returned to the user. But in such scenario there
> is
> > no need to call get before the union is completed.
> > This means there is another scenario where union is used and can be
> > queried while in the process of executing the merge. Is this to maintain
> > some in-memory hierarchy of aggregations? or for creating the snapshots
> > that are flushed to disk?
> > A better understanding of the use case will help in presenting a better
> > thread-safe solution.
> > Thanks,Eshcar
> >
> >    On Wednesday, July 11, 2018, 7:51:24 PM GMT+3, Gian Merlino <
> > gian@apache.org> wrote:
> >
> >  Hi Eshcar,
> >
> > > But even in a single-writer-single-reader scenario removing the lock
> can
> > increase the throughput of accesses to the object.
> >
> > Definitely worth trying this out, imo.
> >
> > > However, I don't understand why is the union object read before the
> > result is ready.
> >
> > It's used as part of incremental indexing: the idea is that we create
> > aggregates during ingestion time and we want those to be queryable even
> > while ingestion is still ongoing. So the ingestion thread will be calling
> > "aggregate" and a query thread will be calling "get" potentially
> > simultaneously.
> >
> > On Wed, Jul 11, 2018 at 1:04 AM Eshcar Hillel <es...@oath.com.invalid>
> > wrote:
> >
> > >  Thanks Gian,
> > > This is also my understanding.But even in a single-writer-single-reader
> > > scenario removing the lock can increase the throughput of accesses to
> the
> > > object.
> > > If the union is only used to produce the result at query time then
> > > removing the lock would not affect ingestion throughput, but could
> > decrease
> > > query latency.However, I don't understand why is the union object read
> > > before the result is ready.
> > >    On Tuesday, July 10, 2018, 8:13:36 PM GMT+3, Gian Merlino <
> > > gian@apache.org> wrote:
> > >
> > >  Hi Eshcar,
> > >
> > > To my knowledge, in the Druid Aggregator and BufferAggregator
> interfaces,
> > > the main place where concurrency happens is that "aggregate" and "get"
> > may
> > > be called simultaneously during realtime ingestion. So if there would
> be
> > a
> > > benefit from improving concurrency it would probably end up in that
> area.
> > >
> > > On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel <eshcar@oath.com.invalid
> >
> > > wrote:
> > >
> > > > Hi All,
> > > > My name is Eshcar Hillel from Oath research. I'm currently working
> with
> > > > Lee Rhodes on committing a new concurrent implementation of the theta
> > > > sketch to the sketches-core library.I was wondering whether this
> > > > implementation can help boost the union operation that is applied to
> > > > multiple sketches at query time in druid.From what I see in the code
> > the
> > > > sketch aggregator uses the SynchronizedUnion implementation, which
> > > > basically uses a lock at every single access (update/read) of the
> union
> > > > operation. We believe a thread-safe implementation of the union
> > operation
> > > > can help decrease the inherent overhead of the lock.
> > > > I will be happy to join the meeting today and briefly discuss this
> > > option.
> > > > Thanks,Eshcar
> > > >
> > > >
> > > >
> > >
> >
>
>

Re: Question about sketches aggregation in druid

Posted by Eshcar Hillel <es...@oath.com.INVALID>.
 I think part of my confusion stems from the gap in the level of abstraction.Druid terminology focuses on aggregation.But in sketches there are two levels of aggregation:A sketch is a first level aggregation which holds the gist of the stream, 
A union is a second level aggregation which can aggregate sketches.
Adding 2 questions to the questions below:
The locks are used to synchronize the access to the union - I assume the union is a second level aggregation merging sketches that are built during ingestion. 
3) If this is not the case then why does druid apply a union and not simply uses a sketch to aggregate the data?4) if it is the case then is it guaranteed that the merged sketches are immutable? otherwise wrapping the union with locks is not enough. 

I hope my questions make more sense now.
Thanks,Eshcar

    On Sunday, July 22, 2018, 4:15:02 PM GMT+3, Eshcar Hillel <es...@oath.com> wrote:  
 
  Thanks Gian - I was missing the part about aggregation during ingestion time roll-up.
I looked at the SketchAggregator code and read the druid overview document let me verify that I got this right.Consider the best effort roll up mode: as events arrive they are ingested into multiple segments, call these s0...s9, but should belong to a single segment.Then a roll-up process ru aggregates s0...s9 one-by-one (?) into a single segment. During the roll up ru can be queried and therefore needs to be thread safe.
1) who is the "owner" of the roll up process? what triggers the roll-up thread? Is it considered as part of the ingestion/indexing time, or is it done at the background as a kind of an optimization?

2) The documents says "Data is queryable as soon as it isingested by the realtime processing logic." Does this means that queries can apply get to s0..s9? should they be thread safe as well?
 

    On Thursday, July 19, 2018, 10:16:34 PM GMT+3, Gian Merlino <gi...@apache.org> wrote:  
 
 Hi Eshcar,

I don't think I 100% understand what you are asking, but I will say some
things, and hopefully they will be helpful.

In Druid we use aggregators for two things: aggregation during ingestion
(for ingestion-time rollup) and aggregation during queries. During queries
the aggregators are only ever used by one thread at a time. At ingestion
time, "aggregate" and "get" can be called simultaneously. It happens
because "aggregate" is called from an ingestion thread (because we update
running aggregators during ingestion), and "get" is called by query threads
(because they "get" those aggregator values from the ingestion aggregator
object to feed them to a query aggregator object). These calls are not
synchronized by Druid, so individual aggregators need to do it themselves
if necessary. There was some effort to address this systematically:
https://github.com/apache/incubator-druid/pull/3956, although it hasn't
been finished yet. Check out some of the discussion on that patch for more
background, and a question I just posted there: does it make more sense for
ingestion-time aggregator thread-safety to be handled systematically (at
the IncrementalIndex) or for each aggregator to need to be thread safe?

If you're looking at "aggregate" and "get" in this file, those are the two
that could get called simultaneously:
https://github.com/apache/incubator-druid/blob/master/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java

On Sun, Jul 15, 2018 at 12:11 AM Eshcar Hillel <es...@oath.com.invalid>
wrote:

>  Apologies, I must be missing something very basic in how incremental
> indexing is working.A sketch is by itself an aggregator - it can absorb
> millions of updates before it exceeds its space limit or is flushed to disk.
> I assumed the ingestion thread aggregates data in multiple sketches in
> parallel, then at query time a union operation is invoked to merge relevant
> sketches based on the attributes of the query, and when the union is
> completed its result is returned to the user. But in such scenario there is
> no need to call get before the union is completed.
> This means there is another scenario where union is used and can be
> queried while in the process of executing the merge. Is this to maintain
> some in-memory hierarchy of aggregations? or for creating the snapshots
> that are flushed to disk?
> A better understanding of the use case will help in presenting a better
> thread-safe solution.
> Thanks,Eshcar
>
>    On Wednesday, July 11, 2018, 7:51:24 PM GMT+3, Gian Merlino <
> gian@apache.org> wrote:
>
>  Hi Eshcar,
>
> > But even in a single-writer-single-reader scenario removing the lock can
> increase the throughput of accesses to the object.
>
> Definitely worth trying this out, imo.
>
> > However, I don't understand why is the union object read before the
> result is ready.
>
> It's used as part of incremental indexing: the idea is that we create
> aggregates during ingestion time and we want those to be queryable even
> while ingestion is still ongoing. So the ingestion thread will be calling
> "aggregate" and a query thread will be calling "get" potentially
> simultaneously.
>
> On Wed, Jul 11, 2018 at 1:04 AM Eshcar Hillel <es...@oath.com.invalid>
> wrote:
>
> >  Thanks Gian,
> > This is also my understanding.But even in a single-writer-single-reader
> > scenario removing the lock can increase the throughput of accesses to the
> > object.
> > If the union is only used to produce the result at query time then
> > removing the lock would not affect ingestion throughput, but could
> decrease
> > query latency.However, I don't understand why is the union object read
> > before the result is ready.
> >    On Tuesday, July 10, 2018, 8:13:36 PM GMT+3, Gian Merlino <
> > gian@apache.org> wrote:
> >
> >  Hi Eshcar,
> >
> > To my knowledge, in the Druid Aggregator and BufferAggregator interfaces,
> > the main place where concurrency happens is that "aggregate" and "get"
> may
> > be called simultaneously during realtime ingestion. So if there would be
> a
> > benefit from improving concurrency it would probably end up in that area.
> >
> > On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel <es...@oath.com.invalid>
> > wrote:
> >
> > > Hi All,
> > > My name is Eshcar Hillel from Oath research. I'm currently working with
> > > Lee Rhodes on committing a new concurrent implementation of the theta
> > > sketch to the sketches-core library.I was wondering whether this
> > > implementation can help boost the union operation that is applied to
> > > multiple sketches at query time in druid.From what I see in the code
> the
> > > sketch aggregator uses the SynchronizedUnion implementation, which
> > > basically uses a lock at every single access (update/read) of the union
> > > operation. We believe a thread-safe implementation of the union
> operation
> > > can help decrease the inherent overhead of the lock.
> > > I will be happy to join the meeting today and briefly discuss this
> > option.
> > > Thanks,Eshcar
> > >
> > >
> > >
> >
>
    

Re: Question about sketches aggregation in druid

Posted by Eshcar Hillel <es...@oath.com.INVALID>.
 Thanks Gian - I was missing the part about aggregation during ingestion time roll-up.
I looked at the SketchAggregator code and read the druid overview document let me verify that I got this right.Consider the best effort roll up mode: as events arrive they are ingested into multiple segments, call these s0...s9, but should belong to a single segment.Then a roll-up process ru aggregates s0...s9 one-by-one (?) into a single segment. During the roll up ru can be queried and therefore needs to be thread safe.
1) who is the "owner" of the roll up process? what triggers the roll-up thread? Is it considered as part of the ingestion/indexing time, or is it done at the background as a kind of an optimization?

2) The documents says "Data is queryable as soon as it isingested by the realtime processing logic." Does this means that queries can apply get to s0..s9? should they be thread safe as well?
 

    On Thursday, July 19, 2018, 10:16:34 PM GMT+3, Gian Merlino <gi...@apache.org> wrote:  
 
 Hi Eshcar,

I don't think I 100% understand what you are asking, but I will say some
things, and hopefully they will be helpful.

In Druid we use aggregators for two things: aggregation during ingestion
(for ingestion-time rollup) and aggregation during queries. During queries
the aggregators are only ever used by one thread at a time. At ingestion
time, "aggregate" and "get" can be called simultaneously. It happens
because "aggregate" is called from an ingestion thread (because we update
running aggregators during ingestion), and "get" is called by query threads
(because they "get" those aggregator values from the ingestion aggregator
object to feed them to a query aggregator object). These calls are not
synchronized by Druid, so individual aggregators need to do it themselves
if necessary. There was some effort to address this systematically:
https://github.com/apache/incubator-druid/pull/3956, although it hasn't
been finished yet. Check out some of the discussion on that patch for more
background, and a question I just posted there: does it make more sense for
ingestion-time aggregator thread-safety to be handled systematically (at
the IncrementalIndex) or for each aggregator to need to be thread safe?

If you're looking at "aggregate" and "get" in this file, those are the two
that could get called simultaneously:
https://github.com/apache/incubator-druid/blob/master/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java

On Sun, Jul 15, 2018 at 12:11 AM Eshcar Hillel <es...@oath.com.invalid>
wrote:

>  Apologies, I must be missing something very basic in how incremental
> indexing is working.A sketch is by itself an aggregator - it can absorb
> millions of updates before it exceeds its space limit or is flushed to disk.
> I assumed the ingestion thread aggregates data in multiple sketches in
> parallel, then at query time a union operation is invoked to merge relevant
> sketches based on the attributes of the query, and when the union is
> completed its result is returned to the user. But in such scenario there is
> no need to call get before the union is completed.
> This means there is another scenario where union is used and can be
> queried while in the process of executing the merge. Is this to maintain
> some in-memory hierarchy of aggregations? or for creating the snapshots
> that are flushed to disk?
> A better understanding of the use case will help in presenting a better
> thread-safe solution.
> Thanks,Eshcar
>
>    On Wednesday, July 11, 2018, 7:51:24 PM GMT+3, Gian Merlino <
> gian@apache.org> wrote:
>
>  Hi Eshcar,
>
> > But even in a single-writer-single-reader scenario removing the lock can
> increase the throughput of accesses to the object.
>
> Definitely worth trying this out, imo.
>
> > However, I don't understand why is the union object read before the
> result is ready.
>
> It's used as part of incremental indexing: the idea is that we create
> aggregates during ingestion time and we want those to be queryable even
> while ingestion is still ongoing. So the ingestion thread will be calling
> "aggregate" and a query thread will be calling "get" potentially
> simultaneously.
>
> On Wed, Jul 11, 2018 at 1:04 AM Eshcar Hillel <es...@oath.com.invalid>
> wrote:
>
> >  Thanks Gian,
> > This is also my understanding.But even in a single-writer-single-reader
> > scenario removing the lock can increase the throughput of accesses to the
> > object.
> > If the union is only used to produce the result at query time then
> > removing the lock would not affect ingestion throughput, but could
> decrease
> > query latency.However, I don't understand why is the union object read
> > before the result is ready.
> >    On Tuesday, July 10, 2018, 8:13:36 PM GMT+3, Gian Merlino <
> > gian@apache.org> wrote:
> >
> >  Hi Eshcar,
> >
> > To my knowledge, in the Druid Aggregator and BufferAggregator interfaces,
> > the main place where concurrency happens is that "aggregate" and "get"
> may
> > be called simultaneously during realtime ingestion. So if there would be
> a
> > benefit from improving concurrency it would probably end up in that area.
> >
> > On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel <es...@oath.com.invalid>
> > wrote:
> >
> > > Hi All,
> > > My name is Eshcar Hillel from Oath research. I'm currently working with
> > > Lee Rhodes on committing a new concurrent implementation of the theta
> > > sketch to the sketches-core library.I was wondering whether this
> > > implementation can help boost the union operation that is applied to
> > > multiple sketches at query time in druid.From what I see in the code
> the
> > > sketch aggregator uses the SynchronizedUnion implementation, which
> > > basically uses a lock at every single access (update/read) of the union
> > > operation. We believe a thread-safe implementation of the union
> operation
> > > can help decrease the inherent overhead of the lock.
> > > I will be happy to join the meeting today and briefly discuss this
> > option.
> > > Thanks,Eshcar
> > >
> > >
> > >
> >
>
  

Re: Question about sketches aggregation in druid

Posted by Gian Merlino <gi...@apache.org>.
Hi Eshcar,

I don't think I 100% understand what you are asking, but I will say some
things, and hopefully they will be helpful.

In Druid we use aggregators for two things: aggregation during ingestion
(for ingestion-time rollup) and aggregation during queries. During queries
the aggregators are only ever used by one thread at a time. At ingestion
time, "aggregate" and "get" can be called simultaneously. It happens
because "aggregate" is called from an ingestion thread (because we update
running aggregators during ingestion), and "get" is called by query threads
(because they "get" those aggregator values from the ingestion aggregator
object to feed them to a query aggregator object). These calls are not
synchronized by Druid, so individual aggregators need to do it themselves
if necessary. There was some effort to address this systematically:
https://github.com/apache/incubator-druid/pull/3956, although it hasn't
been finished yet. Check out some of the discussion on that patch for more
background, and a question I just posted there: does it make more sense for
ingestion-time aggregator thread-safety to be handled systematically (at
the IncrementalIndex) or for each aggregator to need to be thread safe?

If you're looking at "aggregate" and "get" in this file, those are the two
that could get called simultaneously:
https://github.com/apache/incubator-druid/blob/master/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java

On Sun, Jul 15, 2018 at 12:11 AM Eshcar Hillel <es...@oath.com.invalid>
wrote:

>  Apologies, I must be missing something very basic in how incremental
> indexing is working.A sketch is by itself an aggregator - it can absorb
> millions of updates before it exceeds its space limit or is flushed to disk.
> I assumed the ingestion thread aggregates data in multiple sketches in
> parallel, then at query time a union operation is invoked to merge relevant
> sketches based on the attributes of the query, and when the union is
> completed its result is returned to the user. But in such scenario there is
> no need to call get before the union is completed.
> This means there is another scenario where union is used and can be
> queried while in the process of executing the merge. Is this to maintain
> some in-memory hierarchy of aggregations? or for creating the snapshots
> that are flushed to disk?
> A better understanding of the use case will help in presenting a better
> thread-safe solution.
> Thanks,Eshcar
>
>     On Wednesday, July 11, 2018, 7:51:24 PM GMT+3, Gian Merlino <
> gian@apache.org> wrote:
>
>  Hi Eshcar,
>
> > But even in a single-writer-single-reader scenario removing the lock can
> increase the throughput of accesses to the object.
>
> Definitely worth trying this out, imo.
>
> > However, I don't understand why is the union object read before the
> result is ready.
>
> It's used as part of incremental indexing: the idea is that we create
> aggregates during ingestion time and we want those to be queryable even
> while ingestion is still ongoing. So the ingestion thread will be calling
> "aggregate" and a query thread will be calling "get" potentially
> simultaneously.
>
> On Wed, Jul 11, 2018 at 1:04 AM Eshcar Hillel <es...@oath.com.invalid>
> wrote:
>
> >  Thanks Gian,
> > This is also my understanding.But even in a single-writer-single-reader
> > scenario removing the lock can increase the throughput of accesses to the
> > object.
> > If the union is only used to produce the result at query time then
> > removing the lock would not affect ingestion throughput, but could
> decrease
> > query latency.However, I don't understand why is the union object read
> > before the result is ready.
> >    On Tuesday, July 10, 2018, 8:13:36 PM GMT+3, Gian Merlino <
> > gian@apache.org> wrote:
> >
> >  Hi Eshcar,
> >
> > To my knowledge, in the Druid Aggregator and BufferAggregator interfaces,
> > the main place where concurrency happens is that "aggregate" and "get"
> may
> > be called simultaneously during realtime ingestion. So if there would be
> a
> > benefit from improving concurrency it would probably end up in that area.
> >
> > On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel <es...@oath.com.invalid>
> > wrote:
> >
> > > Hi All,
> > > My name is Eshcar Hillel from Oath research. I'm currently working with
> > > Lee Rhodes on committing a new concurrent implementation of the theta
> > > sketch to the sketches-core library.I was wondering whether this
> > > implementation can help boost the union operation that is applied to
> > > multiple sketches at query time in druid.From what I see in the code
> the
> > > sketch aggregator uses the SynchronizedUnion implementation, which
> > > basically uses a lock at every single access (update/read) of the union
> > > operation. We believe a thread-safe implementation of the union
> operation
> > > can help decrease the inherent overhead of the lock.
> > > I will be happy to join the meeting today and briefly discuss this
> > option.
> > > Thanks,Eshcar
> > >
> > >
> > >
> >
>

Re: Question about sketches aggregation in druid

Posted by Eshcar Hillel <es...@oath.com.INVALID>.
 Apologies, I must be missing something very basic in how incremental indexing is working.A sketch is by itself an aggregator - it can absorb millions of updates before it exceeds its space limit or is flushed to disk.
I assumed the ingestion thread aggregates data in multiple sketches in parallel, then at query time a union operation is invoked to merge relevant sketches based on the attributes of the query, and when the union is completed its result is returned to the user. But in such scenario there is no need to call get before the union is completed.
This means there is another scenario where union is used and can be queried while in the process of executing the merge. Is this to maintain some in-memory hierarchy of aggregations? or for creating the snapshots that are flushed to disk?
A better understanding of the use case will help in presenting a better thread-safe solution.
Thanks,Eshcar

    On Wednesday, July 11, 2018, 7:51:24 PM GMT+3, Gian Merlino <gi...@apache.org> wrote:  
 
 Hi Eshcar,

> But even in a single-writer-single-reader scenario removing the lock can
increase the throughput of accesses to the object.

Definitely worth trying this out, imo.

> However, I don't understand why is the union object read before the
result is ready.

It's used as part of incremental indexing: the idea is that we create
aggregates during ingestion time and we want those to be queryable even
while ingestion is still ongoing. So the ingestion thread will be calling
"aggregate" and a query thread will be calling "get" potentially
simultaneously.

On Wed, Jul 11, 2018 at 1:04 AM Eshcar Hillel <es...@oath.com.invalid>
wrote:

>  Thanks Gian,
> This is also my understanding.But even in a single-writer-single-reader
> scenario removing the lock can increase the throughput of accesses to the
> object.
> If the union is only used to produce the result at query time then
> removing the lock would not affect ingestion throughput, but could decrease
> query latency.However, I don't understand why is the union object read
> before the result is ready.
>    On Tuesday, July 10, 2018, 8:13:36 PM GMT+3, Gian Merlino <
> gian@apache.org> wrote:
>
>  Hi Eshcar,
>
> To my knowledge, in the Druid Aggregator and BufferAggregator interfaces,
> the main place where concurrency happens is that "aggregate" and "get" may
> be called simultaneously during realtime ingestion. So if there would be a
> benefit from improving concurrency it would probably end up in that area.
>
> On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel <es...@oath.com.invalid>
> wrote:
>
> > Hi All,
> > My name is Eshcar Hillel from Oath research. I'm currently working with
> > Lee Rhodes on committing a new concurrent implementation of the theta
> > sketch to the sketches-core library.I was wondering whether this
> > implementation can help boost the union operation that is applied to
> > multiple sketches at query time in druid.From what I see in the code the
> > sketch aggregator uses the SynchronizedUnion implementation, which
> > basically uses a lock at every single access (update/read) of the union
> > operation. We believe a thread-safe implementation of the union operation
> > can help decrease the inherent overhead of the lock.
> > I will be happy to join the meeting today and briefly discuss this
> option.
> > Thanks,Eshcar
> >
> >
> >
>
  

Re: Question about sketches aggregation in druid

Posted by Gian Merlino <gi...@apache.org>.
Hi Eshcar,

> But even in a single-writer-single-reader scenario removing the lock can
increase the throughput of accesses to the object.

Definitely worth trying this out, imo.

> However, I don't understand why is the union object read before the
result is ready.

It's used as part of incremental indexing: the idea is that we create
aggregates during ingestion time and we want those to be queryable even
while ingestion is still ongoing. So the ingestion thread will be calling
"aggregate" and a query thread will be calling "get" potentially
simultaneously.

On Wed, Jul 11, 2018 at 1:04 AM Eshcar Hillel <es...@oath.com.invalid>
wrote:

>  Thanks Gian,
> This is also my understanding.But even in a single-writer-single-reader
> scenario removing the lock can increase the throughput of accesses to the
> object.
> If the union is only used to produce the result at query time then
> removing the lock would not affect ingestion throughput, but could decrease
> query latency.However, I don't understand why is the union object read
> before the result is ready.
>     On Tuesday, July 10, 2018, 8:13:36 PM GMT+3, Gian Merlino <
> gian@apache.org> wrote:
>
>  Hi Eshcar,
>
> To my knowledge, in the Druid Aggregator and BufferAggregator interfaces,
> the main place where concurrency happens is that "aggregate" and "get" may
> be called simultaneously during realtime ingestion. So if there would be a
> benefit from improving concurrency it would probably end up in that area.
>
> On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel <es...@oath.com.invalid>
> wrote:
>
> > Hi All,
> > My name is Eshcar Hillel from Oath research. I'm currently working with
> > Lee Rhodes on committing a new concurrent implementation of the theta
> > sketch to the sketches-core library.I was wondering whether this
> > implementation can help boost the union operation that is applied to
> > multiple sketches at query time in druid.From what I see in the code the
> > sketch aggregator uses the SynchronizedUnion implementation, which
> > basically uses a lock at every single access (update/read) of the union
> > operation. We believe a thread-safe implementation of the union operation
> > can help decrease the inherent overhead of the lock.
> > I will be happy to join the meeting today and briefly discuss this
> option.
> > Thanks,Eshcar
> >
> >
> >
>

Re: Question about sketches aggregation in druid

Posted by Eshcar Hillel <es...@oath.com.INVALID>.
 Thanks Gian,
This is also my understanding.But even in a single-writer-single-reader scenario removing the lock can increase the throughput of accesses to the object.
If the union is only used to produce the result at query time then removing the lock would not affect ingestion throughput, but could decrease query latency.However, I don't understand why is the union object read before the result is ready.
    On Tuesday, July 10, 2018, 8:13:36 PM GMT+3, Gian Merlino <gi...@apache.org> wrote:  
 
 Hi Eshcar,

To my knowledge, in the Druid Aggregator and BufferAggregator interfaces,
the main place where concurrency happens is that "aggregate" and "get" may
be called simultaneously during realtime ingestion. So if there would be a
benefit from improving concurrency it would probably end up in that area.

On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel <es...@oath.com.invalid>
wrote:

> Hi All,
> My name is Eshcar Hillel from Oath research. I'm currently working with
> Lee Rhodes on committing a new concurrent implementation of the theta
> sketch to the sketches-core library.I was wondering whether this
> implementation can help boost the union operation that is applied to
> multiple sketches at query time in druid.From what I see in the code the
> sketch aggregator uses the SynchronizedUnion implementation, which
> basically uses a lock at every single access (update/read) of the union
> operation. We believe a thread-safe implementation of the union operation
> can help decrease the inherent overhead of the lock.
> I will be happy to join the meeting today and briefly discuss this option.
> Thanks,Eshcar
>
>
>
  

Re: Question about sketches aggregation in druid

Posted by Eshcar Hillel <es...@oath.com.INVALID>.
 Hi Himanshu,
We did not experiment with the union operation, but these are toy-example throughput numbers we got for a *single* thread writing to a theta sketch:
basic (sketches-core) 167M, 
lock-based 32M, 
our concurrent 115M  (223M with 2 threads, and scaling linearly).The experiments were done in a single sketch environment, and the results might be different with multiple sketches, however it captures the potential gain of removing the lock.
Thanks,Eshcar

    On Wednesday, July 11, 2018, 2:26:53 AM GMT+3, Himanshu <g....@gmail.com> wrote:  
 
 Yes, SynchronizedUnion was added on seeing concurrency error on the
processes doing realtime ingestion. On other processes, it is not called
concurrently and hope is that synchronization does not cause overhead when
that piece of code is only called in one thread because it gets optimized.

On Tue, Jul 10, 2018 at 10:13 AM, Gian Merlino <gi...@apache.org> wrote:

> Hi Eshcar,
>
> To my knowledge, in the Druid Aggregator and BufferAggregator interfaces,
> the main place where concurrency happens is that "aggregate" and "get" may
> be called simultaneously during realtime ingestion. So if there would be a
> benefit from improving concurrency it would probably end up in that area.
>
> On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel <es...@oath.com.invalid>
> wrote:
>
> > Hi All,
> > My name is Eshcar Hillel from Oath research. I'm currently working with
> > Lee Rhodes on committing a new concurrent implementation of the theta
> > sketch to the sketches-core library.I was wondering whether this
> > implementation can help boost the union operation that is applied to
> > multiple sketches at query time in druid.From what I see in the code the
> > sketch aggregator uses the SynchronizedUnion implementation, which
> > basically uses a lock at every single access (update/read) of the union
> > operation. We believe a thread-safe implementation of the union operation
> > can help decrease the inherent overhead of the lock.
> > I will be happy to join the meeting today and briefly discuss this
> option.
> > Thanks,Eshcar
> >
> >
> >
>
  

Re: Question about sketches aggregation in druid

Posted by Himanshu <g....@gmail.com>.
Yes, SynchronizedUnion was added on seeing concurrency error on the
processes doing realtime ingestion. On other processes, it is not called
concurrently and hope is that synchronization does not cause overhead when
that piece of code is only called in one thread because it gets optimized.

On Tue, Jul 10, 2018 at 10:13 AM, Gian Merlino <gi...@apache.org> wrote:

> Hi Eshcar,
>
> To my knowledge, in the Druid Aggregator and BufferAggregator interfaces,
> the main place where concurrency happens is that "aggregate" and "get" may
> be called simultaneously during realtime ingestion. So if there would be a
> benefit from improving concurrency it would probably end up in that area.
>
> On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel <es...@oath.com.invalid>
> wrote:
>
> > Hi All,
> > My name is Eshcar Hillel from Oath research. I'm currently working with
> > Lee Rhodes on committing a new concurrent implementation of the theta
> > sketch to the sketches-core library.I was wondering whether this
> > implementation can help boost the union operation that is applied to
> > multiple sketches at query time in druid.From what I see in the code the
> > sketch aggregator uses the SynchronizedUnion implementation, which
> > basically uses a lock at every single access (update/read) of the union
> > operation. We believe a thread-safe implementation of the union operation
> > can help decrease the inherent overhead of the lock.
> > I will be happy to join the meeting today and briefly discuss this
> option.
> > Thanks,Eshcar
> >
> >
> >
>

Re: Question about sketches aggregation in druid

Posted by Gian Merlino <gi...@apache.org>.
Hi Eshcar,

To my knowledge, in the Druid Aggregator and BufferAggregator interfaces,
the main place where concurrency happens is that "aggregate" and "get" may
be called simultaneously during realtime ingestion. So if there would be a
benefit from improving concurrency it would probably end up in that area.

On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel <es...@oath.com.invalid>
wrote:

> Hi All,
> My name is Eshcar Hillel from Oath research. I'm currently working with
> Lee Rhodes on committing a new concurrent implementation of the theta
> sketch to the sketches-core library.I was wondering whether this
> implementation can help boost the union operation that is applied to
> multiple sketches at query time in druid.From what I see in the code the
> sketch aggregator uses the SynchronizedUnion implementation, which
> basically uses a lock at every single access (update/read) of the union
> operation. We believe a thread-safe implementation of the union operation
> can help decrease the inherent overhead of the lock.
> I will be happy to join the meeting today and briefly discuss this option.
> Thanks,Eshcar
>
>
>