You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@accumulo.apache.org by z11373 <z1...@outlook.com> on 2015/08/27 00:11:06 UTC

using combiner vs. building stats cache

Hi,
Apologize if this question has been asked before (which I am kind of
certain).
I am building a triple store, and need to build the stats table which will
be used for query optimization (i.e. re-order the query triple pattern).
There may be more than 2 solutions for this, but the two I know are:
1. Manually rebuild the whole stats, this can be run once per day for
example
This option would be expensive because we are re-calculating all rows in
master table, but the end result is no more computation when we retrieve the
stat info. For example, we'll just query stats table for word 'foo', and
it'll return a single row with total items for that word.

2. Use Accumulo combiner
With this option, we could simply add the counter to the stats table (i.e.
insert ['foo', 1]) whenever we insert 'foo' to master table. When we want to
get the stat info during query time, Accumulo will actually aggregate all
the count for that word 'foo' in map-reduce fashion.
For #2, we pay the cost during scan time, but if the rows that have word
'foo' only in hundredth, I guess it won't be so bad, because that
aggregation will be done on the server side (and it'd be optimized due to
Accumulo design)

I prefer option #2, but not sure how expensive is that on Accumulo,
especially we'll do a big number of queries per day, than that stats
re-calculating process which is once per day. Any comments on this?
Please let me know if my problem statement or the question is unclear.


Thanks,
Z



--
View this message in context: http://apache-accumulo.1065345.n5.nabble.com/using-combiner-vs-building-stats-cache-tp14979.html
Sent from the Developers mailing list archive at Nabble.com.

Re: using combiner vs. building stats cache

Posted by Josh Elser <jo...@gmail.com>.
> Thanks for the tips Josh. We are using BatchWriter, so it should perform
> better throughput. But I just looked at the code, and it looks like we call
> batchWriter.flush() after each addMutation call. This doesn't seem a good
> utilization of batch writer...
> I am curious on how normally people batch the insert/update? The process may
> crash, and we'll lose those changes unfortunately :-(

Yes, any mutations sent before flush() (or close()) successfully returns 
might not be durable. You would need to have some logic in your 
application to work with this constraint. Hard to give recommendations 
on how to work with this without knowing your workflow. Using a combiner 
is slightly more difficult as sending the same mutation multiple times 
will make your stats incorrect.

Our failure condition could be nicer in this case (ideally, providing 
you the mutations that weren't applied), but that's something that would 
have to be implemented on our side (and no one is working on that 
presently to my knowledge). I'm not sure if there's something easy we 
could do that would make this failure handling easier for you.

Re: using combiner vs. building stats cache

Posted by z11373 <z1...@outlook.com>.
Thanks Dylan.
I'd start with simple implementation first, and improve it later if the
crash happens pretty often, or performance becomes the issue.

Thanks,
zainal



--
View this message in context: http://apache-accumulo.1065345.n5.nabble.com/using-combiner-vs-building-stats-cache-tp14979p15009.html
Sent from the Developers mailing list archive at Nabble.com.

Re: using combiner vs. building stats cache

Posted by Dylan Hutchison <dh...@uw.edu>.
On the BatchWriter, I think it starts flushing data in the background once
enough mutations are added to it to consume half the BatchWriter's max
memory usage set in BatchWriterConfig.  One approach is to set an
appropriate max memory, then never worry about manually flushing and let
the BatchWriter handle flushing as data is added to it.  Make sure to call
close() after everything is finished.  Of course this does not solve the
data loss problem if your app crashes before close()...

On the other hand, the idea Josh presented is to do something called
pre-summing: if the entry ('foo', 1) is added twice to your buffer before
the buffer is flushed, then why not take the two ('foo', 1) entries out and
replace them with a single ('foo', 2) entry to write to your combining
table?  If you take this approach you will have to use your own buffer
rather than the automatic one inside BatchWriter.  It's a simple concept
but requires extra programming and only makes sense when you really want
that last ounce of performance at scale.  Using an LRU-cache with
write-on-eviction is a place to start.  It's also harder to do when the
terms you want to count are spread uniformly randomly in your input
documents, since the likelihood is low a collision will occur in your
buffer unless the buffer is large or the number of unique terms is small.

Conclusion: use BatchWriter's mechanisms for an easy and well performing
solution.  Implement your own buffering (or other way to do pre-summing)
for bonus brownie performance points.

On Fri, Aug 28, 2015 at 4:37 PM, z11373 <z1...@outlook.com> wrote:

> Thanks Josh and Adam!
> My bad, I looked at the code again, actually we only call flush in the end
> (that override function we have only called at the end), so I have another
> issue here, which is the mutation will be lost when the app crash. I will
> think about more on how to mitigate this issue.
>
> Thanks for mentioning about the batch writer semantic. Luckily for our
> case,
> the count doesn't need to be so accurate, as it's more like for the
> optimizer to re-order the queries based on cardinality. The stats
> discrepancy needs to be big enough to screw the result, otherwise it won't
> matter much. This is good tips though, and I'll pay attention on it in the
> future.
>
>
> Thanks,
> Z
>
>
>
> --
> View this message in context:
> http://apache-accumulo.1065345.n5.nabble.com/using-combiner-vs-building-stats-cache-tp14979p15003.html
> Sent from the Developers mailing list archive at Nabble.com.
>

Re: using combiner vs. building stats cache

Posted by z11373 <z1...@outlook.com>.
Thanks Josh and Adam!
My bad, I looked at the code again, actually we only call flush in the end
(that override function we have only called at the end), so I have another
issue here, which is the mutation will be lost when the app crash. I will
think about more on how to mitigate this issue.

Thanks for mentioning about the batch writer semantic. Luckily for our case,
the count doesn't need to be so accurate, as it's more like for the
optimizer to re-order the queries based on cardinality. The stats
discrepancy needs to be big enough to screw the result, otherwise it won't
matter much. This is good tips though, and I'll pay attention on it in the
future.


Thanks,
Z



--
View this message in context: http://apache-accumulo.1065345.n5.nabble.com/using-combiner-vs-building-stats-cache-tp14979p15003.html
Sent from the Developers mailing list archive at Nabble.com.

Re: using combiner vs. building stats cache

Posted by Adam Fuchs <af...@apache.org>.
Calling flush after every write will probably slow you down by more than
1000x, since the flush call is on the order of 10-100ms. Keeping a buffer
of data at your client and only flushing when the buffer is full is usually
a pretty decent strategy. That way you can replay from the buffer in case
of a client failure. Many upstream processing architectures (like Kafka and
Flume) have something like a checkpoint marker that you might be able to
leverage for this purpose.

One tricky issue is that the BatchWriter has an underlying semantic of "at
least once", meaning it is possible under some failure conditions to ingest
data multiple times. With combiners that means your values could end up
being inconsistent. It is not possible to get "once and only once"
semantics with the BatchWriter. Depending on how much you care about your
counts being accurate under these failure modes, this may not be a problem
for you. If it is, you may want to do something a bit more complicated like
write data using bulk imports [1] or implement some type of lambda
architecture [2] to get eventually consistent counts.

Cheers,
Adam

[1] https://accumulo.apache.org/1.7/accumulo_user_manual.html#_bulk_ingest
[2] https://en.wikipedia.org/wiki/Lambda_architecture


On Fri, Aug 28, 2015 at 12:08 PM, z11373 <z1...@outlook.com> wrote:

> Thanks Dylan and late chimer Josh, who is always helpful..
>
> After Dylan's reply, I did a quick experiment:
> 1. Set SummingCombiner -all (scan, minor and major compaction) on the table
> 2. Delete default vers iter from the table (the reason is I just want to
> see
> if the rows got 'combined' or not)
> 3. Insert row id = 'foo' and value = 1
> 4. Insert row id = 'foo' and value = 1
> 5. Scan will return 1 row: 'foo', 2 (so this is correct as expected)
> 6. Delete the summing combiner, so the table doesn't have any iterators now
> 7. Scan the table again, and now it returns 2 rows (both are 'foo', 1)
>
> Then I deleted the table, and redo all steps above, except replace step #5
> with "flush -w". At step #7, it now returns 1 row: 'foo', 2 (this is what I
> want, which means the combiner result got persisted, instead of being
> calculated everytime).
>
> Therefore, the approach I was thinking about writing the snapshot to
> another
> table (because I wanted to avoid aggregation operation every scan) is no
> longer needed, since Accumulo has taken care of this. After compaction,
> it'll have 1 row for each unique key with aggregate value. Cool!
>
> Thanks for the tips Josh. We are using BatchWriter, so it should perform
> better throughput. But I just looked at the code, and it looks like we call
> batchWriter.flush() after each addMutation call. This doesn't seem a good
> utilization of batch writer...
> I am curious on how normally people batch the insert/update? The process
> may
> crash, and we'll lose those changes unfortunately :-(
>
> Thanks,
> Z
>
> Thanks,
> Z
>
>
>
> --
> View this message in context:
> http://apache-accumulo.1065345.n5.nabble.com/using-combiner-vs-building-stats-cache-tp14979p14998.html
> Sent from the Developers mailing list archive at Nabble.com.
>

Re: using combiner vs. building stats cache

Posted by z11373 <z1...@outlook.com>.
Thanks Dylan and late chimer Josh, who is always helpful..

After Dylan's reply, I did a quick experiment:
1. Set SummingCombiner -all (scan, minor and major compaction) on the table
2. Delete default vers iter from the table (the reason is I just want to see
if the rows got 'combined' or not)
3. Insert row id = 'foo' and value = 1
4. Insert row id = 'foo' and value = 1
5. Scan will return 1 row: 'foo', 2 (so this is correct as expected)
6. Delete the summing combiner, so the table doesn't have any iterators now
7. Scan the table again, and now it returns 2 rows (both are 'foo', 1)

Then I deleted the table, and redo all steps above, except replace step #5
with "flush -w". At step #7, it now returns 1 row: 'foo', 2 (this is what I
want, which means the combiner result got persisted, instead of being
calculated everytime).

Therefore, the approach I was thinking about writing the snapshot to another
table (because I wanted to avoid aggregation operation every scan) is no
longer needed, since Accumulo has taken care of this. After compaction,
it'll have 1 row for each unique key with aggregate value. Cool!

Thanks for the tips Josh. We are using BatchWriter, so it should perform
better throughput. But I just looked at the code, and it looks like we call
batchWriter.flush() after each addMutation call. This doesn't seem a good
utilization of batch writer...
I am curious on how normally people batch the insert/update? The process may
crash, and we'll lose those changes unfortunately :-(

Thanks,
Z

Thanks,
Z



--
View this message in context: http://apache-accumulo.1065345.n5.nabble.com/using-combiner-vs-building-stats-cache-tp14979p14998.html
Sent from the Developers mailing list archive at Nabble.com.

Re: using combiner vs. building stats cache

Posted by Josh Elser <jo...@gmail.com>.
Late chime in: Dylan and Russ are on the money. A combiner is the way to go.

And since there was some confusion on the matter, a table with 100 '1' 
values for a given key would require the tabletserver to sum these 
values, and then return them to the client. After the table compacts 
(assuming a full compaction), these 100 '1' values would be rewritten on 
disk to 1 '100' value. The beauty of this is that, as an application, 
you don't have to know whether the values were combined by a 
tabletserver before you saw them or if the value came directly from 
disk. You just get a strongly consistent view of the data in your table.

Assuming you do go the combiner route, beware of writing a single '1' 
update for every "term" you see. If you can do some batching of updates 
to your stats table before writing to Accumulo (split the combination 
work between your client and the servers), you should see better 
throughput than sending lots of updates to the stats table.

Dylan Hutchison wrote:
> Sounds like you have the idea now Z.  There are three places an iterator
> can be applied: scan time, minor compaction time, and major compaction
> time.  Minor compactions help your case a lot-- when enough entries are
> written to a tablet server that the tablet server needs to dump them to a
> new Hadoop RFile, the minor compaction iterators run on the entries as they
> stream to the RFile.  This means that each RFile has only one entry for
> each unique (row, column family, column qualifier) tuple.
>
> Entries with the same (row, column family, column qualifier) in distinct
> RFiles will get combined at the next major compaction, or on the fly during
> the next scan.
>
> For example, let say there are 100 rows of [foo, 1], it will actually be
>> 'combined' to a single row [foo, 100]?
>
>
> Careful-- Accumulo's combiners combine on Keys with identical row, column
> family and column qualifier.  You'd have to make a more fancy iterator if
> you want to combine all the entries that share the same row.  Let us know
> if you need help doing that.
>
>
> On Thu, Aug 27, 2015 at 3:09 PM, z11373<z1...@outlook.com>  wrote:
>
>> Thanks again Russ!
>>
>> "but it might not be in this case if most of the data has already been
>> combined"
>> Does this mean Accumulo actually combine and persist the combined result
>> after the scan/compaction (depending on which op the combiner is applied)?
>> For example, let say there are 100 rows of [foo, 1], it will actually be
>> 'combined' to a single row [foo, 100]? If that is the case, then combiner
>> is
>> not expensive.
>>
>> Wow! that's brilliant using -1 approach, I didn't even think about it
>> before. Yes, this will work for my case because i only need to know the
>> count.
>>
>> Thanks,
>> Z
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-accumulo.1065345.n5.nabble.com/using-combiner-vs-building-stats-cache-tp14979p14988.html
>> Sent from the Developers mailing list archive at Nabble.com.
>>
>

Re: using combiner vs. building stats cache

Posted by Dylan Hutchison <dh...@mit.edu>.
Sounds like you have the idea now Z.  There are three places an iterator
can be applied: scan time, minor compaction time, and major compaction
time.  Minor compactions help your case a lot-- when enough entries are
written to a tablet server that the tablet server needs to dump them to a
new Hadoop RFile, the minor compaction iterators run on the entries as they
stream to the RFile.  This means that each RFile has only one entry for
each unique (row, column family, column qualifier) tuple.

Entries with the same (row, column family, column qualifier) in distinct
RFiles will get combined at the next major compaction, or on the fly during
the next scan.

For example, let say there are 100 rows of [foo, 1], it will actually be
> 'combined' to a single row [foo, 100]?


Careful-- Accumulo's combiners combine on Keys with identical row, column
family and column qualifier.  You'd have to make a more fancy iterator if
you want to combine all the entries that share the same row.  Let us know
if you need help doing that.


On Thu, Aug 27, 2015 at 3:09 PM, z11373 <z1...@outlook.com> wrote:

> Thanks again Russ!
>
> "but it might not be in this case if most of the data has already been
> combined"
> Does this mean Accumulo actually combine and persist the combined result
> after the scan/compaction (depending on which op the combiner is applied)?
> For example, let say there are 100 rows of [foo, 1], it will actually be
> 'combined' to a single row [foo, 100]? If that is the case, then combiner
> is
> not expensive.
>
> Wow! that's brilliant using -1 approach, I didn't even think about it
> before. Yes, this will work for my case because i only need to know the
> count.
>
> Thanks,
> Z
>
>
>
> --
> View this message in context:
> http://apache-accumulo.1065345.n5.nabble.com/using-combiner-vs-building-stats-cache-tp14979p14988.html
> Sent from the Developers mailing list archive at Nabble.com.
>

Re: using combiner vs. building stats cache

Posted by z11373 <z1...@outlook.com>.
Thanks again Russ!

"but it might not be in this case if most of the data has already been
combined"
Does this mean Accumulo actually combine and persist the combined result
after the scan/compaction (depending on which op the combiner is applied)?
For example, let say there are 100 rows of [foo, 1], it will actually be
'combined' to a single row [foo, 100]? If that is the case, then combiner is
not expensive.

Wow! that's brilliant using -1 approach, I didn't even think about it
before. Yes, this will work for my case because i only need to know the
count.

Thanks,
Z



--
View this message in context: http://apache-accumulo.1065345.n5.nabble.com/using-combiner-vs-building-stats-cache-tp14979p14988.html
Sent from the Developers mailing list archive at Nabble.com.

Re: using combiner vs. building stats cache

Posted by Russ Weeks <rw...@newbrightidea.com>.
On Thu, Aug 27, 2015 at 9:33 AM z11373 <z1...@outlook.com> wrote:

> Russ: I like your idea (indeed best of both worlds), so during compaction
> time, we can store that stats info to another table (but this time it will
> be only single row, hence won't affect query time). So I can add the code
> to
> insert to another table in the reduce() of my custom combiner, right? Or is
> there a better way?
>

No, don't trigger the table snapshot or compaction from inside your
combiner. I'd do it as a scheduled task via cron or something like that. A
full major compaction is generally seen as a big job, but it might not be
in this case if most of the data has already been combined. Alternatively,
if you can isolate a range of rows to be compacted you can pass that into
TableOperations.compact to speed things up.

I think the only way to guarantee that your scans of the snapshot are
dealing with totally compacted data is to compact after the snapshot. But I
think if you want both the original table and the snapshot to get the
benefit of compaction, you'd want to compact before the snapshot and accept
the risk that there might be a little bit of uncompacted data in the
snapshot.

Honestly, this is how I *think* it should all work, but there are probably
people on this list who are more familiar with combiners, snapshots and
compaction than me.

Let say we have table called TEMP_STATS which we apply the custom combiner.
> During ingestion, we simply insert a row, i.e. ['foo', 1] to the table.
> Next
> time insert ['foo', 1], and so on. Let say we have 10 rows of 'foo', so
> reading that word would return 'foo', 10 (thanks to combiner). Now I want
> to
> delete only 1 row, so that it'd return 'foo', 9 instead. What is the best
> way to do this?
>

If all you're doing in your stats table is tracking counts, then you could
insert 'foo':-1 and the count will be adjusted correctly. If you're also
tracking mins and maxes, you'll need a different approach... which I would
be fascinated to understand because it seems like a very hard problem.

-Russ

One option I could think is to apply another identifier, i.e. seq number, so
> it'd insert ['foo', 1, 1], ['foo', 2, 1], and so on (the second number will
> be the seq# and can be stored as column qualifier). Then I have to modify
> the combiner to make it also returns the highest seq# (i.e. 'foo', 10, 10).
> When deleting for one item only, I could just put delete 'foo', :10, and it
> will only mark that row as deleted. Any other better approach?
>
>
> Thanks,
> Z
>
>
>
> --
> View this message in context:
> http://apache-accumulo.1065345.n5.nabble.com/using-combiner-vs-building-stats-cache-tp14979p14984.html
> Sent from the Developers mailing list archive at Nabble.com.
>

Re: using combiner vs. building stats cache

Posted by z11373 <z1...@outlook.com>.
Thanks Dylan and Russ!

Dylan: I guess that option is ok if it is only has few hunded total for that
same word, but if word 'foo' has a million in total, then Accumulo still has
to go thru that 1 million of items to sum the count, hence I think will be
expensive in that case even though it doesn't have to return those rows back
to client.

Russ: I like your idea (indeed best of both worlds), so during compaction
time, we can store that stats info to another table (but this time it will
be only single row, hence won't affect query time). So I can add the code to
insert to another table in the reduce() of my custom combiner, right? Or is
there a better way?


Another question, I'd think using combiner will also be perfect for delete
scenario since it doesn't need to re-calculate the whole thing. However, how
really to delete only 1 row from those rows in the would-do-combine table?
Let me give an example below to be clear.

Let say we have table called TEMP_STATS which we apply the custom combiner.
During ingestion, we simply insert a row, i.e. ['foo', 1] to the table. Next
time insert ['foo', 1], and so on. Let say we have 10 rows of 'foo', so
reading that word would return 'foo', 10 (thanks to combiner). Now I want to
delete only 1 row, so that it'd return 'foo', 9 instead. What is the best
way to do this?
One option I could think is to apply another identifier, i.e. seq number, so
it'd insert ['foo', 1, 1], ['foo', 2, 1], and so on (the second number will
be the seq# and can be stored as column qualifier). Then I have to modify
the combiner to make it also returns the highest seq# (i.e. 'foo', 10, 10).
When deleting for one item only, I could just put delete 'foo', :10, and it
will only mark that row as deleted. Any other better approach?


Thanks,
Z



--
View this message in context: http://apache-accumulo.1065345.n5.nabble.com/using-combiner-vs-building-stats-cache-tp14979p14984.html
Sent from the Developers mailing list archive at Nabble.com.

Re: using combiner vs. building stats cache

Posted by Russ Weeks <rw...@newbrightidea.com>.
You only pay the cost at scan-time for values that haven't been compacted.
If you snapshot your stats table after a compaction and then do your scans
on the snapshot, then you would get the best of options 1 and 2. The
drawback is that the values returned by the scan would not be totally
up-to-date.

Regards,
-Russ

On Wed, Aug 26, 2015 at 9:06 PM Dylan Hutchison <dh...@mit.edu> wrote:

> Go for option #2 and use the combiners.  It's one of the core features of
> Accumulo and the overhead at insert-time is minimal.  Developer time
> overhead is also minimal-- add a couple lines next to where you make your
> mutations and you're done.
>
> Regards, Dylan
>
> On Wed, Aug 26, 2015 at 6:11 PM, z11373 <z1...@outlook.com> wrote:
>
> > Hi,
> > Apologize if this question has been asked before (which I am kind of
> > certain).
> > I am building a triple store, and need to build the stats table which
> will
> > be used for query optimization (i.e. re-order the query triple pattern).
> > There may be more than 2 solutions for this, but the two I know are:
> > 1. Manually rebuild the whole stats, this can be run once per day for
> > example
> > This option would be expensive because we are re-calculating all rows in
> > master table, but the end result is no more computation when we retrieve
> > the
> > stat info. For example, we'll just query stats table for word 'foo', and
> > it'll return a single row with total items for that word.
> >
> > 2. Use Accumulo combiner
> > With this option, we could simply add the counter to the stats table
> (i.e.
> > insert ['foo', 1]) whenever we insert 'foo' to master table. When we want
> > to
> > get the stat info during query time, Accumulo will actually aggregate all
> > the count for that word 'foo' in map-reduce fashion.
> > For #2, we pay the cost during scan time, but if the rows that have word
> > 'foo' only in hundredth, I guess it won't be so bad, because that
> > aggregation will be done on the server side (and it'd be optimized due to
> > Accumulo design)
> >
> > I prefer option #2, but not sure how expensive is that on Accumulo,
> > especially we'll do a big number of queries per day, than that stats
> > re-calculating process which is once per day. Any comments on this?
> > Please let me know if my problem statement or the question is unclear.
> >
> >
> > Thanks,
> > Z
> >
> >
> >
> > --
> > View this message in context:
> >
> http://apache-accumulo.1065345.n5.nabble.com/using-combiner-vs-building-stats-cache-tp14979.html
> > Sent from the Developers mailing list archive at Nabble.com.
> >
>

Re: using combiner vs. building stats cache

Posted by Dylan Hutchison <dh...@mit.edu>.
Go for option #2 and use the combiners.  It's one of the core features of
Accumulo and the overhead at insert-time is minimal.  Developer time
overhead is also minimal-- add a couple lines next to where you make your
mutations and you're done.

Regards, Dylan

On Wed, Aug 26, 2015 at 6:11 PM, z11373 <z1...@outlook.com> wrote:

> Hi,
> Apologize if this question has been asked before (which I am kind of
> certain).
> I am building a triple store, and need to build the stats table which will
> be used for query optimization (i.e. re-order the query triple pattern).
> There may be more than 2 solutions for this, but the two I know are:
> 1. Manually rebuild the whole stats, this can be run once per day for
> example
> This option would be expensive because we are re-calculating all rows in
> master table, but the end result is no more computation when we retrieve
> the
> stat info. For example, we'll just query stats table for word 'foo', and
> it'll return a single row with total items for that word.
>
> 2. Use Accumulo combiner
> With this option, we could simply add the counter to the stats table (i.e.
> insert ['foo', 1]) whenever we insert 'foo' to master table. When we want
> to
> get the stat info during query time, Accumulo will actually aggregate all
> the count for that word 'foo' in map-reduce fashion.
> For #2, we pay the cost during scan time, but if the rows that have word
> 'foo' only in hundredth, I guess it won't be so bad, because that
> aggregation will be done on the server side (and it'd be optimized due to
> Accumulo design)
>
> I prefer option #2, but not sure how expensive is that on Accumulo,
> especially we'll do a big number of queries per day, than that stats
> re-calculating process which is once per day. Any comments on this?
> Please let me know if my problem statement or the question is unclear.
>
>
> Thanks,
> Z
>
>
>
> --
> View this message in context:
> http://apache-accumulo.1065345.n5.nabble.com/using-combiner-vs-building-stats-cache-tp14979.html
> Sent from the Developers mailing list archive at Nabble.com.
>