You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@accumulo.apache.org by "Slater, David M." <Da...@jhuapl.edu> on 2014/05/20 19:10:56 UTC
Improving Batchscanner Performance
Hey everyone,
I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
row colFam colQual value
rowUID -- -- byte[] of data
These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
// returns the data associated with the given collection of rows
public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
List<byte[]> values = new ArrayList<byte[]>(rows.size());
if (!rows.isEmpty()) {
BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
List<Range> ranges = new ArrayList<Range>();
for (Text row : rows) {
ranges.add(new Range(row));
}
scanner.setRanges(ranges);
for (Map.Entry<Key, Value> entry : scanner) {
values.add(entry.getValue().get());
}
scanner.close();
}
return values;
}
Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
Thanks,
David
RE: Improving Batchscanner Performance
Posted by Bo...@l-3com.com.
David
I use this same pattern and have for the last couple of years. What I have put in place is a cache (redis) between the batchscanner thread that is reading the index tables and a separate consumer thread that's doing the final lookup of the rowID's. The rowID pool can grow as it needs to in redis, then the consumer thread pulls whatever ID's it needs to do the final scan to get my row data. Tuning the batchsize for the scanner to a small number to match the consumers needs has helped us keep the final scanner from blocking on too much data or flooding the network with unneeded data between the server and client(s).
I would recommend you reconsider the used of UUID for rowID, we found a significant performance improvement by using an organic rowID that fit the data/purpose better which orders the data on fewer tablet servers most of the time. The accumulation of more threads in a batch scanner just because we used random rowID seems to have diminishing returns as the rowID's approach randomness.
HTH
-----Original Message-----
From: Slater, David M. [mailto:David.Slater@jhuapl.edu]
Sent: Tuesday, May 20, 2014 12:11 PM
To: user@accumulo.apache.org
Subject: Improving Batchscanner Performance
Hey everyone,
I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
row colFam colQual value
rowUID -- -- byte[] of data
These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
// returns the data associated with the given collection of rows
public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
List<byte[]> values = new ArrayList<byte[]>(rows.size());
if (!rows.isEmpty()) {
BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
List<Range> ranges = new ArrayList<Range>();
for (Text row : rows) {
ranges.add(new Range(row));
}
scanner.setRanges(ranges);
for (Map.Entry<Key, Value> entry : scanner) {
values.add(entry.getValue().get());
}
scanner.close();
}
return values;
}
Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
Thanks,
David
RE: Improving Batchscanner Performance
Posted by "Slater, David M." <Da...@jhuapl.edu>.
Ah, here is some rfile info:
hadoop fs -ls
-rw-r--r-- 2 hadoop supergroup 88638093 2014-05-21 12:44 /accumulo/tables/32/t-0014fhi/A0014h25.rf
rfile.info
Locality group : <DEFAULT>
Start block : 0
Num blocks : 2,938
Index level 1 : 290 bytes 1 blocks
Index level 0 : 297,686 bytes 3 blocks
First key : 14006808|19|000001dd-04f9-4dc4-9d0e-0d7070d6ebaa ipdst:128.244.17.61 [] 1400684778699 false
Last key : 14006808|19|ffffefd8-9b71-4a92-aa4d-06259dcc1e58 ttl:64 [] 1400685445408 false
Num entries : 7,906,420
Column families : [ttl, ipdst, question, response,PTR, response,NS, timestamp, ipsrc, response,A, response,TXT, response,CNAME]
Meta block : BCFile.index
Raw size : 4 bytes
Compressed size : 12 bytes
Compression type : gz
Meta block : RFile.index
Raw size : 620 bytes
Compressed size : 414 bytes
Compression type : gz
Meta block : acu_bloom
Raw size : 3,080,506 bytes
Compressed size : 2,502,618 bytes
Compression type : gz
Thanks,
David
-----Original Message-----
From: Josh Elser [mailto:josh.elser@gmail.com]
Sent: Wednesday, May 21, 2014 1:30 PM
To: user@accumulo.apache.org
Subject: Re: Improving Batchscanner Performance
Inline'd this time
On 5/21/14, 12:58 PM, Slater, David M. wrote:
> You are correct that the "bin" is largely redundant. I created that because I was not guaranteed that the guid was uniformly random (I have seen some that aren't uniformly distributed), and I'm not the one who specified it. There is another mechanism I didn't mention, which is that the bin is prepended by a timeblock (typically an hour span), and my data is streaming. So essentially, I create a number of splits for the next timeblock for X bins, and then when the data input moves into that time block it can ingest directly onto empty tablets.
Gotcha.
> I don't think rfile-info comes on 1.4, but I looked at the !METADATA table, and if I'm reading it correctly:
Oops, you're right, I think it was introduced in 1.5, but it's just a wrapper. You can invoke the PrintInfo class directly:
accumulo org.apache.accumulo.core.file.rfile.PrintInfo '/path/to/rfile.rf'
> 31;14006844|00 file:/t-0014fpy/A0014h4u.rf [] 155454467,5450454
>
> This is a 155 MB file with an index block of 5.45 MB. This is a typical size for a timeblock|bin combination.
>
> After the data gets over a day old, I do a nightly job to merge the bins for each timeblock together, resulting in data like:
> 31;14000292|00 file:/t-0011bgk/C0011e06.rf [] 1922144744,67390597
> 31;14000292|00 file:/t-0011bgk/C0011ed3.rf [] 1942040855,68058489
>
> This is about 4 GB with 140 MB of index. So it looks like the index size is about 3.5% of the files, if I'm reading it correctly.
I think you're confused about what those numbers mean. The two numbers in the Value are size in bytes and number of entries, not data size and index size.
This means that your entries are about 30bytes in size, which seems in line with what you described given the encoding/compression Accumulo is doing.
You could try playing with table.file.compress.blocksize. IIRC, if you reduce this value from the default 100k, you would get more blocks per RFile, which means that you get more index records, which, in turn, would mean you can find your records faster at the cost of a larger index.
> In total, there about 440 tablets per server, with 4 servers, storing a total of about 2.1 TB of data (each server has a single 1 TB HDD).
>
> I enabled bloom filters, but I didn't restart Accumulo. Is it necessary to restart Accumulo to do that, or are bloom filters normally generated? I have an index cache of 256M for each tserver.
After a quick glance at the code, it appears that every time a Reader is opened to a file, we check the configuration and use a BloomFilter if enabled. I don't think you need to restart the tservers.
> Thanks,
> David
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Wednesday, May 21, 2014 12:18 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> I wouldn't expect that you'd see much difference moving the guid to the colfam (or colqual for that matter).
>
> A few more questions that come to mind though...
>
> * What's the purpose of the "bin"? Your guid is likely random anyways which will give you uniformity (which is what a bin prefix like that is usually meant to provide).
>
> * How many splits do you have on this table? At least a few per tserver?
>
> You could also try looking at the size of the index for a couple of rfiles for your table (`bin/accumulo rfile-info '/hdfs/path/to/rfile.rf'`). I would think that you should have faster lookups than what you noted.
>
> On 5/20/14, 4:34 PM, Slater, David M. wrote:
>> 10-100 entries per node (4 nodes total).
>>
>> Would changing the data table structure change the batchscanner performance?
>>
>> I'm using:
>> row colFam colQual value
>> bin|guid -- -- byte[]
>>
>> would it be faster/slower to use:
>> row colFam colQual value
>> bin guid -- byte[]
>>
>> The difference would be that the first would include everything as a Collection of ranges, where the second would use a combination of ranges and setting column families.
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 3:17 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> 10-100 entries/s seems slow, but that's mostly a gut feeling without context. Is this over more than one node? 10s of nodes?
>>
>> A value of 1M would would explain the pause that you see in the beginning. That parameter controls the size of the buffer that each tserver will fill before sending data back to the BatchScanner. Too small and you pay for the excessive RPCs, too large, and like you're seeing, it takes longer for you to get the first batch. You should be able to reduce that value and see a much quick first result come out of the batchscanner.
>>
>> Number of rfiles could impact read performance as you have to do a merged-read over all of the rfiles for a tablet.
>>
>> On 5/20/14, 3:08 PM, Slater, David M. wrote:
>>> I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.
>>>
>>> I don't think this would be a problem with table.scan.max.memory=1M, would it?
>>>
>>> Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?
>>>
>>> -----Original Message-----
>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>> Sent: Tuesday, May 20, 2014 2:42 PM
>>> To: user@accumulo.apache.org
>>> Subject: Re: Improving Batchscanner Performance
>>>
>>> No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.
>>>
>>> The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.
>>>
>>> If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.
>>>
>>> On 5/20/14, 2:24 PM, Slater, David M. wrote:
>>>> Josh,
>>>>
>>>> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>>>>
>>>> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>>>>
>>>> Does the BatchScanner fetch rows based on the ordering of the Collection?
>>>>
>>>> Thanks,
>>>> David
>>>>
>>>> -----Original Message-----
>>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>>> Sent: Tuesday, May 20, 2014 1:59 PM
>>>> To: user@accumulo.apache.org
>>>> Subject: Re: Improving Batchscanner Performance
>>>>
>>>> You actually stated it exactly here:
>>>>
>>>> > I complete the first scan in its entirety
>>>>
>>>> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>>>>
>>>> > Collection<Text> rows = getRowIDs(new Range("minRow",
>>>> "maxRow"), new Text("index"), "mytable", 10, 10000); >
>>>> Collection<byte[]> data = getRowData(rows, "mytable", 10);
>>>>
>>>> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>>>>
>>>> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>>>>
>>>> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>>>>
>>>>
>>>> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>>>>> Hi Josh,
>>>>>
>>>>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>>>>
>>>>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>>>>
>>>>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which before entering this method with the collection of Text rowIDs. The method for that is:
>>>>>
>>>>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>>>> Set<Text> guids = new HashSet<Text>();
>>>>> if (!ranges.isEmpty()) {
>>>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>> scanner.setRanges(ranges);
>>>>> scanner.fetchColumnFamily(term);
>>>>> for (Map.Entry<Key, Value> entry : scanner) {
>>>>> guids.add(entry.getKey().getColumnQualifier());
>>>>> if (guids.size() > limit) {
>>>>> return null;
>>>>> }
>>>>> }
>>>>> scanner.close();
>>>>> }
>>>>> return guids;
>>>>> }
>>>>>
>>>>> Essentially, my query does:
>>>>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"),
>>>>> new Text("index"), "mytable", 10, 10000); Collection<byte[]> data
>>>>> = getRowData(rows, "mytable", 10);
>>>>>
>>>>>
>>>>> -----Original Message-----
>>>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>>>> Sent: Tuesday, May 20, 2014 1:32 PM
>>>>> To: user@accumulo.apache.org
>>>>> Subject: Re: Improving Batchscanner Performance
>>>>>
>>>>> Hi David,
>>>>>
>>>>> Absolutely. What you have here is a classic producer-consumer model.
>>>>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>>>>
>>>>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>>>>
>>>>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>>>>
>>>>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>>>>
>>>>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>>>>
>>>>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>>>>> Hey everyone,
>>>>>>
>>>>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>>>>> row colFam colQual value
>>>>>> rowUID -- -- byte[] of data
>>>>>>
>>>>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>>>>
>>>>>> // returns the data associated with the given collection of rows
>>>>>> public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>>>> List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>>>> if (!rows.isEmpty()) {
>>>>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>>> List<Range> ranges = new ArrayList<Range>();
>>>>>> for (Text row : rows) {
>>>>>> ranges.add(new Range(row));
>>>>>> }
>>>>>> scanner.setRanges(ranges);
>>>>>> for (Map.Entry<Key, Value> entry : scanner) {
>>>>>> values.add(entry.getValue().get());
>>>>>> }
>>>>>> scanner.close();
>>>>>> }
>>>>>> return values;
>>>>>> }
>>>>>>
>>>>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>>>>
>>>>>> Thanks,
>>>>>> David
>>>>>>
Re: Improving Batchscanner Performance
Posted by Josh Elser <jo...@gmail.com>.
Inline'd this time
On 5/21/14, 12:58 PM, Slater, David M. wrote:
> You are correct that the "bin" is largely redundant. I created that because I was not guaranteed that the guid was uniformly random (I have seen some that aren't uniformly distributed), and I'm not the one who specified it. There is another mechanism I didn't mention, which is that the bin is prepended by a timeblock (typically an hour span), and my data is streaming. So essentially, I create a number of splits for the next timeblock for X bins, and then when the data input moves into that time block it can ingest directly onto empty tablets.
Gotcha.
> I don't think rfile-info comes on 1.4, but I looked at the !METADATA table, and if I'm reading it correctly:
Oops, you're right, I think it was introduced in 1.5, but it's just a
wrapper. You can invoke the PrintInfo class directly:
accumulo org.apache.accumulo.core.file.rfile.PrintInfo '/path/to/rfile.rf'
> 31;14006844|00 file:/t-0014fpy/A0014h4u.rf [] 155454467,5450454
>
> This is a 155 MB file with an index block of 5.45 MB. This is a typical size for a timeblock|bin combination.
>
> After the data gets over a day old, I do a nightly job to merge the bins for each timeblock together, resulting in data like:
> 31;14000292|00 file:/t-0011bgk/C0011e06.rf [] 1922144744,67390597
> 31;14000292|00 file:/t-0011bgk/C0011ed3.rf [] 1942040855,68058489
>
> This is about 4 GB with 140 MB of index. So it looks like the index size is about 3.5% of the files, if I'm reading it correctly.
I think you're confused about what those numbers mean. The two numbers
in the Value are size in bytes and number of entries, not data size and
index size.
This means that your entries are about 30bytes in size, which seems in
line with what you described given the encoding/compression Accumulo is
doing.
You could try playing with table.file.compress.blocksize. IIRC, if you
reduce this value from the default 100k, you would get more blocks per
RFile, which means that you get more index records, which, in turn,
would mean you can find your records faster at the cost of a larger index.
> In total, there about 440 tablets per server, with 4 servers, storing a total of about 2.1 TB of data (each server has a single 1 TB HDD).
>
> I enabled bloom filters, but I didn't restart Accumulo. Is it necessary to restart Accumulo to do that, or are bloom filters normally generated? I have an index cache of 256M for each tserver.
After a quick glance at the code, it appears that every time a Reader is
opened to a file, we check the configuration and use a BloomFilter if
enabled. I don't think you need to restart the tservers.
> Thanks,
> David
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Wednesday, May 21, 2014 12:18 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> I wouldn't expect that you'd see much difference moving the guid to the colfam (or colqual for that matter).
>
> A few more questions that come to mind though...
>
> * What's the purpose of the "bin"? Your guid is likely random anyways which will give you uniformity (which is what a bin prefix like that is usually meant to provide).
>
> * How many splits do you have on this table? At least a few per tserver?
>
> You could also try looking at the size of the index for a couple of rfiles for your table (`bin/accumulo rfile-info '/hdfs/path/to/rfile.rf'`). I would think that you should have faster lookups than what you noted.
>
> On 5/20/14, 4:34 PM, Slater, David M. wrote:
>> 10-100 entries per node (4 nodes total).
>>
>> Would changing the data table structure change the batchscanner performance?
>>
>> I'm using:
>> row colFam colQual value
>> bin|guid -- -- byte[]
>>
>> would it be faster/slower to use:
>> row colFam colQual value
>> bin guid -- byte[]
>>
>> The difference would be that the first would include everything as a Collection of ranges, where the second would use a combination of ranges and setting column families.
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 3:17 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> 10-100 entries/s seems slow, but that's mostly a gut feeling without context. Is this over more than one node? 10s of nodes?
>>
>> A value of 1M would would explain the pause that you see in the beginning. That parameter controls the size of the buffer that each tserver will fill before sending data back to the BatchScanner. Too small and you pay for the excessive RPCs, too large, and like you're seeing, it takes longer for you to get the first batch. You should be able to reduce that value and see a much quick first result come out of the batchscanner.
>>
>> Number of rfiles could impact read performance as you have to do a merged-read over all of the rfiles for a tablet.
>>
>> On 5/20/14, 3:08 PM, Slater, David M. wrote:
>>> I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.
>>>
>>> I don't think this would be a problem with table.scan.max.memory=1M, would it?
>>>
>>> Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?
>>>
>>> -----Original Message-----
>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>> Sent: Tuesday, May 20, 2014 2:42 PM
>>> To: user@accumulo.apache.org
>>> Subject: Re: Improving Batchscanner Performance
>>>
>>> No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.
>>>
>>> The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.
>>>
>>> If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.
>>>
>>> On 5/20/14, 2:24 PM, Slater, David M. wrote:
>>>> Josh,
>>>>
>>>> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>>>>
>>>> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>>>>
>>>> Does the BatchScanner fetch rows based on the ordering of the Collection?
>>>>
>>>> Thanks,
>>>> David
>>>>
>>>> -----Original Message-----
>>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>>> Sent: Tuesday, May 20, 2014 1:59 PM
>>>> To: user@accumulo.apache.org
>>>> Subject: Re: Improving Batchscanner Performance
>>>>
>>>> You actually stated it exactly here:
>>>>
>>>> > I complete the first scan in its entirety
>>>>
>>>> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>>>>
>>>> > Collection<Text> rows = getRowIDs(new Range("minRow",
>>>> "maxRow"), new Text("index"), "mytable", 10, 10000); >
>>>> Collection<byte[]> data = getRowData(rows, "mytable", 10);
>>>>
>>>> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>>>>
>>>> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>>>>
>>>> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>>>>
>>>>
>>>> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>>>>> Hi Josh,
>>>>>
>>>>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>>>>
>>>>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>>>>
>>>>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which before entering this method with the collection of Text rowIDs. The method for that is:
>>>>>
>>>>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>>>> Set<Text> guids = new HashSet<Text>();
>>>>> if (!ranges.isEmpty()) {
>>>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>> scanner.setRanges(ranges);
>>>>> scanner.fetchColumnFamily(term);
>>>>> for (Map.Entry<Key, Value> entry : scanner) {
>>>>> guids.add(entry.getKey().getColumnQualifier());
>>>>> if (guids.size() > limit) {
>>>>> return null;
>>>>> }
>>>>> }
>>>>> scanner.close();
>>>>> }
>>>>> return guids;
>>>>> }
>>>>>
>>>>> Essentially, my query does:
>>>>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"),
>>>>> new Text("index"), "mytable", 10, 10000); Collection<byte[]> data =
>>>>> getRowData(rows, "mytable", 10);
>>>>>
>>>>>
>>>>> -----Original Message-----
>>>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>>>> Sent: Tuesday, May 20, 2014 1:32 PM
>>>>> To: user@accumulo.apache.org
>>>>> Subject: Re: Improving Batchscanner Performance
>>>>>
>>>>> Hi David,
>>>>>
>>>>> Absolutely. What you have here is a classic producer-consumer model.
>>>>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>>>>
>>>>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>>>>
>>>>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>>>>
>>>>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>>>>
>>>>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>>>>
>>>>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>>>>> Hey everyone,
>>>>>>
>>>>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>>>>> row colFam colQual value
>>>>>> rowUID -- -- byte[] of data
>>>>>>
>>>>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>>>>
>>>>>> // returns the data associated with the given collection of rows
>>>>>> public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>>>> List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>>>> if (!rows.isEmpty()) {
>>>>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>>> List<Range> ranges = new ArrayList<Range>();
>>>>>> for (Text row : rows) {
>>>>>> ranges.add(new Range(row));
>>>>>> }
>>>>>> scanner.setRanges(ranges);
>>>>>> for (Map.Entry<Key, Value> entry : scanner) {
>>>>>> values.add(entry.getValue().get());
>>>>>> }
>>>>>> scanner.close();
>>>>>> }
>>>>>> return values;
>>>>>> }
>>>>>>
>>>>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>>>>
>>>>>> Thanks,
>>>>>> David
>>>>>>
RE: Improving Batchscanner Performance
Posted by "Slater, David M." <Da...@jhuapl.edu>.
You are correct that the "bin" is largely redundant. I created that because I was not guaranteed that the guid was uniformly random (I have seen some that aren't uniformly distributed), and I'm not the one who specified it. There is another mechanism I didn't mention, which is that the bin is prepended by a timeblock (typically an hour span), and my data is streaming. So essentially, I create a number of splits for the next timeblock for X bins, and then when the data input moves into that time block it can ingest directly onto empty tablets.
I don't think rfile-info comes on 1.4, but I looked at the !METADATA table, and if I'm reading it correctly:
31;14006844|00 file:/t-0014fpy/A0014h4u.rf [] 155454467,5450454
This is a 155 MB file with an index block of 5.45 MB. This is a typical size for a timeblock|bin combination.
After the data gets over a day old, I do a nightly job to merge the bins for each timeblock together, resulting in data like:
31;14000292|00 file:/t-0011bgk/C0011e06.rf [] 1922144744,67390597
31;14000292|00 file:/t-0011bgk/C0011ed3.rf [] 1942040855,68058489
This is about 4 GB with 140 MB of index. So it looks like the index size is about 3.5% of the files, if I'm reading it correctly.
In total, there about 440 tablets per server, with 4 servers, storing a total of about 2.1 TB of data (each server has a single 1 TB HDD).
I enabled bloom filters, but I didn't restart Accumulo. Is it necessary to restart Accumulo to do that, or are bloom filters normally generated? I have an index cache of 256M for each tserver.
Thanks,
David
-----Original Message-----
From: Josh Elser [mailto:josh.elser@gmail.com]
Sent: Wednesday, May 21, 2014 12:18 PM
To: user@accumulo.apache.org
Subject: Re: Improving Batchscanner Performance
I wouldn't expect that you'd see much difference moving the guid to the colfam (or colqual for that matter).
A few more questions that come to mind though...
* What's the purpose of the "bin"? Your guid is likely random anyways which will give you uniformity (which is what a bin prefix like that is usually meant to provide).
* How many splits do you have on this table? At least a few per tserver?
You could also try looking at the size of the index for a couple of rfiles for your table (`bin/accumulo rfile-info '/hdfs/path/to/rfile.rf'`). I would think that you should have faster lookups than what you noted.
On 5/20/14, 4:34 PM, Slater, David M. wrote:
> 10-100 entries per node (4 nodes total).
>
> Would changing the data table structure change the batchscanner performance?
>
> I'm using:
> row colFam colQual value
> bin|guid -- -- byte[]
>
> would it be faster/slower to use:
> row colFam colQual value
> bin guid -- byte[]
>
> The difference would be that the first would include everything as a Collection of ranges, where the second would use a combination of ranges and setting column families.
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 3:17 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> 10-100 entries/s seems slow, but that's mostly a gut feeling without context. Is this over more than one node? 10s of nodes?
>
> A value of 1M would would explain the pause that you see in the beginning. That parameter controls the size of the buffer that each tserver will fill before sending data back to the BatchScanner. Too small and you pay for the excessive RPCs, too large, and like you're seeing, it takes longer for you to get the first batch. You should be able to reduce that value and see a much quick first result come out of the batchscanner.
>
> Number of rfiles could impact read performance as you have to do a merged-read over all of the rfiles for a tablet.
>
> On 5/20/14, 3:08 PM, Slater, David M. wrote:
>> I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.
>>
>> I don't think this would be a problem with table.scan.max.memory=1M, would it?
>>
>> Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 2:42 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.
>>
>> The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.
>>
>> If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.
>>
>> On 5/20/14, 2:24 PM, Slater, David M. wrote:
>>> Josh,
>>>
>>> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>>>
>>> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>>>
>>> Does the BatchScanner fetch rows based on the ordering of the Collection?
>>>
>>> Thanks,
>>> David
>>>
>>> -----Original Message-----
>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>> Sent: Tuesday, May 20, 2014 1:59 PM
>>> To: user@accumulo.apache.org
>>> Subject: Re: Improving Batchscanner Performance
>>>
>>> You actually stated it exactly here:
>>>
>>> > I complete the first scan in its entirety
>>>
>>> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>>>
>>> > Collection<Text> rows = getRowIDs(new Range("minRow",
>>> "maxRow"), new Text("index"), "mytable", 10, 10000); >
>>> Collection<byte[]> data = getRowData(rows, "mytable", 10);
>>>
>>> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>>>
>>> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>>>
>>> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>>>
>>>
>>> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>>>> Hi Josh,
>>>>
>>>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>>>
>>>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>>>
>>>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which before entering this method with the collection of Text rowIDs. The method for that is:
>>>>
>>>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>>> Set<Text> guids = new HashSet<Text>();
>>>> if (!ranges.isEmpty()) {
>>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>> scanner.setRanges(ranges);
>>>> scanner.fetchColumnFamily(term);
>>>> for (Map.Entry<Key, Value> entry : scanner) {
>>>> guids.add(entry.getKey().getColumnQualifier());
>>>> if (guids.size() > limit) {
>>>> return null;
>>>> }
>>>> }
>>>> scanner.close();
>>>> }
>>>> return guids;
>>>> }
>>>>
>>>> Essentially, my query does:
>>>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"),
>>>> new Text("index"), "mytable", 10, 10000); Collection<byte[]> data =
>>>> getRowData(rows, "mytable", 10);
>>>>
>>>>
>>>> -----Original Message-----
>>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>>> Sent: Tuesday, May 20, 2014 1:32 PM
>>>> To: user@accumulo.apache.org
>>>> Subject: Re: Improving Batchscanner Performance
>>>>
>>>> Hi David,
>>>>
>>>> Absolutely. What you have here is a classic producer-consumer model.
>>>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>>>
>>>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>>>
>>>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>>>
>>>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>>>
>>>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>>>
>>>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>>>> Hey everyone,
>>>>>
>>>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>>>> row colFam colQual value
>>>>> rowUID -- -- byte[] of data
>>>>>
>>>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>>>
>>>>> // returns the data associated with the given collection of rows
>>>>> public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>>> List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>>> if (!rows.isEmpty()) {
>>>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>> List<Range> ranges = new ArrayList<Range>();
>>>>> for (Text row : rows) {
>>>>> ranges.add(new Range(row));
>>>>> }
>>>>> scanner.setRanges(ranges);
>>>>> for (Map.Entry<Key, Value> entry : scanner) {
>>>>> values.add(entry.getValue().get());
>>>>> }
>>>>> scanner.close();
>>>>> }
>>>>> return values;
>>>>> }
>>>>>
>>>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>>>
>>>>> Thanks,
>>>>> David
>>>>>
Re: Improving Batchscanner Performance
Posted by Josh Elser <jo...@gmail.com>.
I wouldn't expect that you'd see much difference moving the guid to the
colfam (or colqual for that matter).
A few more questions that come to mind though...
* What's the purpose of the "bin"? Your guid is likely random anyways
which will give you uniformity (which is what a bin prefix like that is
usually meant to provide).
* How many splits do you have on this table? At least a few per tserver?
You could also try looking at the size of the index for a couple of
rfiles for your table (`bin/accumulo rfile-info
'/hdfs/path/to/rfile.rf'`). I would think that you should have faster
lookups than what you noted.
On 5/20/14, 4:34 PM, Slater, David M. wrote:
> 10-100 entries per node (4 nodes total).
>
> Would changing the data table structure change the batchscanner performance?
>
> I'm using:
> row colFam colQual value
> bin|guid -- -- byte[]
>
> would it be faster/slower to use:
> row colFam colQual value
> bin guid -- byte[]
>
> The difference would be that the first would include everything as a Collection of ranges, where the second would use a combination of ranges and setting column families.
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 3:17 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> 10-100 entries/s seems slow, but that's mostly a gut feeling without context. Is this over more than one node? 10s of nodes?
>
> A value of 1M would would explain the pause that you see in the beginning. That parameter controls the size of the buffer that each tserver will fill before sending data back to the BatchScanner. Too small and you pay for the excessive RPCs, too large, and like you're seeing, it takes longer for you to get the first batch. You should be able to reduce that value and see a much quick first result come out of the batchscanner.
>
> Number of rfiles could impact read performance as you have to do a merged-read over all of the rfiles for a tablet.
>
> On 5/20/14, 3:08 PM, Slater, David M. wrote:
>> I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.
>>
>> I don't think this would be a problem with table.scan.max.memory=1M, would it?
>>
>> Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 2:42 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.
>>
>> The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.
>>
>> If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.
>>
>> On 5/20/14, 2:24 PM, Slater, David M. wrote:
>>> Josh,
>>>
>>> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>>>
>>> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>>>
>>> Does the BatchScanner fetch rows based on the ordering of the Collection?
>>>
>>> Thanks,
>>> David
>>>
>>> -----Original Message-----
>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>> Sent: Tuesday, May 20, 2014 1:59 PM
>>> To: user@accumulo.apache.org
>>> Subject: Re: Improving Batchscanner Performance
>>>
>>> You actually stated it exactly here:
>>>
>>> > I complete the first scan in its entirety
>>>
>>> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>>>
>>> > Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"),
>>> new Text("index"), "mytable", 10, 10000); > Collection<byte[]> data
>>> = getRowData(rows, "mytable", 10);
>>>
>>> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>>>
>>> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>>>
>>> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>>>
>>>
>>> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>>>> Hi Josh,
>>>>
>>>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>>>
>>>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>>>
>>>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which before entering this method with the collection of Text rowIDs. The method for that is:
>>>>
>>>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>>> Set<Text> guids = new HashSet<Text>();
>>>> if (!ranges.isEmpty()) {
>>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>> scanner.setRanges(ranges);
>>>> scanner.fetchColumnFamily(term);
>>>> for (Map.Entry<Key, Value> entry : scanner) {
>>>> guids.add(entry.getKey().getColumnQualifier());
>>>> if (guids.size() > limit) {
>>>> return null;
>>>> }
>>>> }
>>>> scanner.close();
>>>> }
>>>> return guids;
>>>> }
>>>>
>>>> Essentially, my query does:
>>>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new
>>>> Text("index"), "mytable", 10, 10000); Collection<byte[]> data =
>>>> getRowData(rows, "mytable", 10);
>>>>
>>>>
>>>> -----Original Message-----
>>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>>> Sent: Tuesday, May 20, 2014 1:32 PM
>>>> To: user@accumulo.apache.org
>>>> Subject: Re: Improving Batchscanner Performance
>>>>
>>>> Hi David,
>>>>
>>>> Absolutely. What you have here is a classic producer-consumer model.
>>>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>>>
>>>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>>>
>>>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>>>
>>>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>>>
>>>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>>>
>>>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>>>> Hey everyone,
>>>>>
>>>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>>>> row colFam colQual value
>>>>> rowUID -- -- byte[] of data
>>>>>
>>>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>>>
>>>>> // returns the data associated with the given collection of rows
>>>>> public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>>> List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>>> if (!rows.isEmpty()) {
>>>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>> List<Range> ranges = new ArrayList<Range>();
>>>>> for (Text row : rows) {
>>>>> ranges.add(new Range(row));
>>>>> }
>>>>> scanner.setRanges(ranges);
>>>>> for (Map.Entry<Key, Value> entry : scanner) {
>>>>> values.add(entry.getValue().get());
>>>>> }
>>>>> scanner.close();
>>>>> }
>>>>> return values;
>>>>> }
>>>>>
>>>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>>>
>>>>> Thanks,
>>>>> David
>>>>>
RE: Improving Batchscanner Performance
Posted by "Slater, David M." <Da...@jhuapl.edu>.
10-100 entries per node (4 nodes total).
Would changing the data table structure change the batchscanner performance?
I'm using:
row colFam colQual value
bin|guid -- -- byte[]
would it be faster/slower to use:
row colFam colQual value
bin guid -- byte[]
The difference would be that the first would include everything as a Collection of ranges, where the second would use a combination of ranges and setting column families.
-----Original Message-----
From: Josh Elser [mailto:josh.elser@gmail.com]
Sent: Tuesday, May 20, 2014 3:17 PM
To: user@accumulo.apache.org
Subject: Re: Improving Batchscanner Performance
10-100 entries/s seems slow, but that's mostly a gut feeling without context. Is this over more than one node? 10s of nodes?
A value of 1M would would explain the pause that you see in the beginning. That parameter controls the size of the buffer that each tserver will fill before sending data back to the BatchScanner. Too small and you pay for the excessive RPCs, too large, and like you're seeing, it takes longer for you to get the first batch. You should be able to reduce that value and see a much quick first result come out of the batchscanner.
Number of rfiles could impact read performance as you have to do a merged-read over all of the rfiles for a tablet.
On 5/20/14, 3:08 PM, Slater, David M. wrote:
> I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.
>
> I don't think this would be a problem with table.scan.max.memory=1M, would it?
>
> Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 2:42 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.
>
> The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.
>
> If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.
>
> On 5/20/14, 2:24 PM, Slater, David M. wrote:
>> Josh,
>>
>> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>>
>> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>>
>> Does the BatchScanner fetch rows based on the ordering of the Collection?
>>
>> Thanks,
>> David
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 1:59 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> You actually stated it exactly here:
>>
>> > I complete the first scan in its entirety
>>
>> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>>
>> > Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"),
>> new Text("index"), "mytable", 10, 10000); > Collection<byte[]> data
>> = getRowData(rows, "mytable", 10);
>>
>> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>>
>> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>>
>> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>>
>>
>> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>>> Hi Josh,
>>>
>>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>>
>>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>>
>>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which before entering this method with the collection of Text rowIDs. The method for that is:
>>>
>>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>> Set<Text> guids = new HashSet<Text>();
>>> if (!ranges.isEmpty()) {
>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>> scanner.setRanges(ranges);
>>> scanner.fetchColumnFamily(term);
>>> for (Map.Entry<Key, Value> entry : scanner) {
>>> guids.add(entry.getKey().getColumnQualifier());
>>> if (guids.size() > limit) {
>>> return null;
>>> }
>>> }
>>> scanner.close();
>>> }
>>> return guids;
>>> }
>>>
>>> Essentially, my query does:
>>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new
>>> Text("index"), "mytable", 10, 10000); Collection<byte[]> data =
>>> getRowData(rows, "mytable", 10);
>>>
>>>
>>> -----Original Message-----
>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>> Sent: Tuesday, May 20, 2014 1:32 PM
>>> To: user@accumulo.apache.org
>>> Subject: Re: Improving Batchscanner Performance
>>>
>>> Hi David,
>>>
>>> Absolutely. What you have here is a classic producer-consumer model.
>>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>>
>>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>>
>>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>>
>>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>>
>>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>>
>>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>>> Hey everyone,
>>>>
>>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>>> row colFam colQual value
>>>> rowUID -- -- byte[] of data
>>>>
>>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>>
>>>> // returns the data associated with the given collection of rows
>>>> public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>> List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>> if (!rows.isEmpty()) {
>>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>> List<Range> ranges = new ArrayList<Range>();
>>>> for (Text row : rows) {
>>>> ranges.add(new Range(row));
>>>> }
>>>> scanner.setRanges(ranges);
>>>> for (Map.Entry<Key, Value> entry : scanner) {
>>>> values.add(entry.getValue().get());
>>>> }
>>>> scanner.close();
>>>> }
>>>> return values;
>>>> }
>>>>
>>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>>
>>>> Thanks,
>>>> David
>>>>
Re: Improving Batchscanner Performance
Posted by Josh Elser <jo...@gmail.com>.
10-100 entries/s seems slow, but that's mostly a gut feeling without
context. Is this over more than one node? 10s of nodes?
A value of 1M would would explain the pause that you see in the
beginning. That parameter controls the size of the buffer that each
tserver will fill before sending data back to the BatchScanner. Too
small and you pay for the excessive RPCs, too large, and like you're
seeing, it takes longer for you to get the first batch. You should be
able to reduce that value and see a much quick first result come out of
the batchscanner.
Number of rfiles could impact read performance as you have to do a
merged-read over all of the rfiles for a tablet.
On 5/20/14, 3:08 PM, Slater, David M. wrote:
> I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.
>
> I don't think this would be a problem with table.scan.max.memory=1M, would it?
>
> Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 2:42 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.
>
> The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.
>
> If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.
>
> On 5/20/14, 2:24 PM, Slater, David M. wrote:
>> Josh,
>>
>> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>>
>> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>>
>> Does the BatchScanner fetch rows based on the ordering of the Collection?
>>
>> Thanks,
>> David
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 1:59 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> You actually stated it exactly here:
>>
>> > I complete the first scan in its entirety
>>
>> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>>
>> > Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"),
>> new Text("index"), "mytable", 10, 10000); > Collection<byte[]> data =
>> getRowData(rows, "mytable", 10);
>>
>> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>>
>> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>>
>> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>>
>>
>> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>>> Hi Josh,
>>>
>>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>>
>>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>>
>>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which before entering this method with the collection of Text rowIDs. The method for that is:
>>>
>>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>> Set<Text> guids = new HashSet<Text>();
>>> if (!ranges.isEmpty()) {
>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>> scanner.setRanges(ranges);
>>> scanner.fetchColumnFamily(term);
>>> for (Map.Entry<Key, Value> entry : scanner) {
>>> guids.add(entry.getKey().getColumnQualifier());
>>> if (guids.size() > limit) {
>>> return null;
>>> }
>>> }
>>> scanner.close();
>>> }
>>> return guids;
>>> }
>>>
>>> Essentially, my query does:
>>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new
>>> Text("index"), "mytable", 10, 10000); Collection<byte[]> data =
>>> getRowData(rows, "mytable", 10);
>>>
>>>
>>> -----Original Message-----
>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>> Sent: Tuesday, May 20, 2014 1:32 PM
>>> To: user@accumulo.apache.org
>>> Subject: Re: Improving Batchscanner Performance
>>>
>>> Hi David,
>>>
>>> Absolutely. What you have here is a classic producer-consumer model.
>>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>>
>>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>>
>>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>>
>>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>>
>>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>>
>>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>>> Hey everyone,
>>>>
>>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>>> row colFam colQual value
>>>> rowUID -- -- byte[] of data
>>>>
>>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>>
>>>> // returns the data associated with the given collection of rows
>>>> public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>> List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>> if (!rows.isEmpty()) {
>>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>> List<Range> ranges = new ArrayList<Range>();
>>>> for (Text row : rows) {
>>>> ranges.add(new Range(row));
>>>> }
>>>> scanner.setRanges(ranges);
>>>> for (Map.Entry<Key, Value> entry : scanner) {
>>>> values.add(entry.getValue().get());
>>>> }
>>>> scanner.close();
>>>> }
>>>> return values;
>>>> }
>>>>
>>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>>
>>>> Thanks,
>>>> David
>>>>
RE: Improving Batchscanner Performance
Posted by "Slater, David M." <Da...@jhuapl.edu>.
I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.
I don't think this would be a problem with table.scan.max.memory=1M, would it?
Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?
-----Original Message-----
From: Josh Elser [mailto:josh.elser@gmail.com]
Sent: Tuesday, May 20, 2014 2:42 PM
To: user@accumulo.apache.org
Subject: Re: Improving Batchscanner Performance
No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.
The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.
If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.
On 5/20/14, 2:24 PM, Slater, David M. wrote:
> Josh,
>
> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>
> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>
> Does the BatchScanner fetch rows based on the ordering of the Collection?
>
> Thanks,
> David
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 1:59 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> You actually stated it exactly here:
>
> > I complete the first scan in its entirety
>
> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>
> > Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"),
> new Text("index"), "mytable", 10, 10000); > Collection<byte[]> data =
> getRowData(rows, "mytable", 10);
>
> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>
> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>
> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>
>
> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>> Hi Josh,
>>
>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>
>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>
>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which before entering this method with the collection of Text rowIDs. The method for that is:
>>
>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>> Set<Text> guids = new HashSet<Text>();
>> if (!ranges.isEmpty()) {
>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>> scanner.setRanges(ranges);
>> scanner.fetchColumnFamily(term);
>> for (Map.Entry<Key, Value> entry : scanner) {
>> guids.add(entry.getKey().getColumnQualifier());
>> if (guids.size() > limit) {
>> return null;
>> }
>> }
>> scanner.close();
>> }
>> return guids;
>> }
>>
>> Essentially, my query does:
>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new
>> Text("index"), "mytable", 10, 10000); Collection<byte[]> data =
>> getRowData(rows, "mytable", 10);
>>
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 1:32 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> Hi David,
>>
>> Absolutely. What you have here is a classic producer-consumer model.
>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>
>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>
>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>
>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>
>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>
>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>> Hey everyone,
>>>
>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>> row colFam colQual value
>>> rowUID -- -- byte[] of data
>>>
>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>
>>> // returns the data associated with the given collection of rows
>>> public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>> List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>> if (!rows.isEmpty()) {
>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>> List<Range> ranges = new ArrayList<Range>();
>>> for (Text row : rows) {
>>> ranges.add(new Range(row));
>>> }
>>> scanner.setRanges(ranges);
>>> for (Map.Entry<Key, Value> entry : scanner) {
>>> values.add(entry.getValue().get());
>>> }
>>> scanner.close();
>>> }
>>> return values;
>>> }
>>>
>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>
>>> Thanks,
>>> David
>>>
Re: Improving Batchscanner Performance
Posted by Josh Elser <jo...@gmail.com>.
No, that is how it's done. The ranges that you provide to the
BatchScanner are binned to tablets hosted by tabletserver. It will then
query up to numQueryThreads tservers at once to fetch results in parallel.
The point I was making is that you can only bin ranges within the scope
of a single BatchScanner, and if you were making repeated calls to your
original function with differing arguments, you might be incurring some
more penalty. Like Bob, fetching random sets of rows and data is what I
was trying to lead you to.
If the bandwidth of fetching the data is not a factor, I would probably
agree that random reads are an issue. Do you have more details you can
give about how long it takes to fetch the data for N rows (e.g. number
of key-values/second and/or amount of data/second)? Are you getting an
even distribution across your tservers or hot-spotted on a few number
(the monitor should help here)? It can sometimes be a bit of a balancing
act with optimizing locality while avoid suffering from hotspots.
On 5/20/14, 2:24 PM, Slater, David M. wrote:
> Josh,
>
> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>
> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>
> Does the BatchScanner fetch rows based on the ordering of the Collection?
>
> Thanks,
> David
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 1:59 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> You actually stated it exactly here:
>
> > I complete the first scan in its entirety
>
> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>
> > Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new Text("index"), "mytable", 10, 10000); > Collection<byte[]> data = getRowData(rows, "mytable", 10);
>
> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>
> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>
> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>
>
> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>> Hi Josh,
>>
>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>
>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>
>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which before entering this method with the collection of Text rowIDs. The method for that is:
>>
>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>> Set<Text> guids = new HashSet<Text>();
>> if (!ranges.isEmpty()) {
>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>> scanner.setRanges(ranges);
>> scanner.fetchColumnFamily(term);
>> for (Map.Entry<Key, Value> entry : scanner) {
>> guids.add(entry.getKey().getColumnQualifier());
>> if (guids.size() > limit) {
>> return null;
>> }
>> }
>> scanner.close();
>> }
>> return guids;
>> }
>>
>> Essentially, my query does:
>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new
>> Text("index"), "mytable", 10, 10000); Collection<byte[]> data =
>> getRowData(rows, "mytable", 10);
>>
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 1:32 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> Hi David,
>>
>> Absolutely. What you have here is a classic producer-consumer model.
>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>
>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>
>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>
>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>
>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>
>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>> Hey everyone,
>>>
>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>> row colFam colQual value
>>> rowUID -- -- byte[] of data
>>>
>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>
>>> // returns the data associated with the given collection of rows
>>> public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>> List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>> if (!rows.isEmpty()) {
>>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>> List<Range> ranges = new ArrayList<Range>();
>>> for (Text row : rows) {
>>> ranges.add(new Range(row));
>>> }
>>> scanner.setRanges(ranges);
>>> for (Map.Entry<Key, Value> entry : scanner) {
>>> values.add(entry.getValue().get());
>>> }
>>> scanner.close();
>>> }
>>> return values;
>>> }
>>>
>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>
>>> Thanks,
>>> David
>>>
RE: Improving Batchscanner Performance
Posted by "Slater, David M." <Da...@jhuapl.edu>.
Josh,
The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
Does the BatchScanner fetch rows based on the ordering of the Collection?
Thanks,
David
-----Original Message-----
From: Josh Elser [mailto:josh.elser@gmail.com]
Sent: Tuesday, May 20, 2014 1:59 PM
To: user@accumulo.apache.org
Subject: Re: Improving Batchscanner Performance
You actually stated it exactly here:
> I complete the first scan in its entirety
Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new Text("index"), "mytable", 10, 10000); > Collection<byte[]> data = getRowData(rows, "mytable", 10);
Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
On 5/20/14, 1:51 PM, Slater, David M. wrote:
> Hi Josh,
>
> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>
> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>
> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which before entering this method with the collection of Text rowIDs. The method for that is:
>
> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
> Set<Text> guids = new HashSet<Text>();
> if (!ranges.isEmpty()) {
> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
> scanner.setRanges(ranges);
> scanner.fetchColumnFamily(term);
> for (Map.Entry<Key, Value> entry : scanner) {
> guids.add(entry.getKey().getColumnQualifier());
> if (guids.size() > limit) {
> return null;
> }
> }
> scanner.close();
> }
> return guids;
> }
>
> Essentially, my query does:
> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new
> Text("index"), "mytable", 10, 10000); Collection<byte[]> data =
> getRowData(rows, "mytable", 10);
>
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 1:32 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> Hi David,
>
> Absolutely. What you have here is a classic producer-consumer model.
> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>
> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>
> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>
> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>
> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>
> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>> Hey everyone,
>>
>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>> row colFam colQual value
>> rowUID -- -- byte[] of data
>>
>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>
>> // returns the data associated with the given collection of rows
>> public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>> List<byte[]> values = new ArrayList<byte[]>(rows.size());
>> if (!rows.isEmpty()) {
>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>> List<Range> ranges = new ArrayList<Range>();
>> for (Text row : rows) {
>> ranges.add(new Range(row));
>> }
>> scanner.setRanges(ranges);
>> for (Map.Entry<Key, Value> entry : scanner) {
>> values.add(entry.getValue().get());
>> }
>> scanner.close();
>> }
>> return values;
>> }
>>
>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>
>> Thanks,
>> David
>>
Re: Improving Batchscanner Performance
Posted by Josh Elser <jo...@gmail.com>.
You actually stated it exactly here:
> I complete the first scan in its entirety
Loading the data into a Collection also implies that you're loading the
complete set of rows and blocking until you find all rows, or until you
fetch all of the data.
> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new
Text("index"), "mytable", 10, 10000);
> Collection<byte[]> data = getRowData(rows, "mytable", 10);
Both the BatchScanner and Scanner are returning KeyValue pairs in
"batches". The client talks to server(s), reads some data and returns it
to you. By virtue of you loading these results from the Iterator into a
Collection, you are consuming *all* results before proceeding to fetch
the data for the rows.
Now, if, like you said, looking up the rows is drastically faster than
fetching the data, there's a question as to why this is. Is it safe to
assume that the data is much larger than the rows you're fetching? Have
you tried to see what the throughput of fetching this data is? If it's
bounded by network speed, you could try compressing the data in an
iterator server-side before returning it to the client.
You could also consider the locality of the rows that you're fetching --
are you fetching a "random" set of rows each time and paying a penalty
of talking to each server to fetch the data when you could ammortize the
cost if you fetched the data for rows that are close together. A large
amount of data being returned is likely going to trump the additional
cost in talking to many servers.
On 5/20/14, 1:51 PM, Slater, David M. wrote:
> Hi Josh,
>
> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>
> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>
> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which before entering this method with the collection of Text rowIDs. The method for that is:
>
> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
> Set<Text> guids = new HashSet<Text>();
> if (!ranges.isEmpty()) {
> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
> scanner.setRanges(ranges);
> scanner.fetchColumnFamily(term);
> for (Map.Entry<Key, Value> entry : scanner) {
> guids.add(entry.getKey().getColumnQualifier());
> if (guids.size() > limit) {
> return null;
> }
> }
> scanner.close();
> }
> return guids;
> }
>
> Essentially, my query does:
> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new Text("index"), "mytable", 10, 10000);
> Collection<byte[]> data = getRowData(rows, "mytable", 10);
>
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 1:32 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> Hi David,
>
> Absolutely. What you have here is a classic producer-consumer model.
> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>
> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>
> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>
> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>
> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>
> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>> Hey everyone,
>>
>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>> row colFam colQual value
>> rowUID -- -- byte[] of data
>>
>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>
>> // returns the data associated with the given collection of rows
>> public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>> List<byte[]> values = new ArrayList<byte[]>(rows.size());
>> if (!rows.isEmpty()) {
>> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>> List<Range> ranges = new ArrayList<Range>();
>> for (Text row : rows) {
>> ranges.add(new Range(row));
>> }
>> scanner.setRanges(ranges);
>> for (Map.Entry<Key, Value> entry : scanner) {
>> values.add(entry.getValue().get());
>> }
>> scanner.close();
>> }
>> return values;
>> }
>>
>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>
>> Thanks,
>> David
>>
Re: Improving Batchscanner Performance
Posted by William Slacum <wi...@accumulo.net>.
By "blocking", we mean you have to complete the entire index look up before
fetching your records.
Conceptually, instead of returning a `Collection<Text> rows`, return an
`Iterator<Text> rows` and consume them in batches as the first look up
produces them. That way record look ups can occur in parallel with index
look ups.
On Tue, May 20, 2014 at 1:51 PM, Slater, David M.
<Da...@jhuapl.edu>wrote:
> Hi Josh,
>
> I should have clarified - I am using a batchscanner for both lookups. I
> had thought of putting it into two different threads, but the first scan is
> typically an order of magnitude faster than the second.
>
> The logic for upperbounding the results returned is outside of the method
> I provided. Since there is a one-to-one relationship between rowIDs and
> records on the second scan, I just limit the number of rows I send to this
> method.
>
> As for blocking, I'm not sure exactly what you mean. I complete the first
> scan in its entirety, which before entering this method with the
> collection of Text rowIDs. The method for that is:
>
> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term,
> String tablename, int queryThreads, int limit) throws
> TableNotFoundException {
> Set<Text> guids = new HashSet<Text>();
> if (!ranges.isEmpty()) {
> BatchScanner scanner = conn.createBatchScanner(tablename, new
> Authorizations(), queryThreads);
> scanner.setRanges(ranges);
> scanner.fetchColumnFamily(term);
> for (Map.Entry<Key, Value> entry : scanner) {
> guids.add(entry.getKey().getColumnQualifier());
> if (guids.size() > limit) {
> return null;
> }
> }
> scanner.close();
> }
> return guids;
> }
>
> Essentially, my query does:
> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new
> Text("index"), "mytable", 10, 10000);
> Collection<byte[]> data = getRowData(rows, "mytable", 10);
>
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 1:32 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> Hi David,
>
> Absolutely. What you have here is a classic producer-consumer model.
> Your BatchScanner is producing results, which you then consume by your
> scanner, and ultimately return those results to the client.
>
> The problem with your below implementation is that you're not going to be
> polling your batchscanner as aggressively as you could be. You are blocking
> while you can fetch each of those new Ranges from the Scanner before
> fetching new ranges. Have you considered splitting up the BatchScanner and
> Scanner code into two different threads?
>
> You could easily use a ArrayBlockingQueue (or similar) to pass results
> from the BatchScanner to the Scanner. I would imagine that this would give
> you a fair improvement in performance.
>
> Also, it doesn't appear that there's a reason you can't use a BatchScanner
> for both lookups?
>
> One final warning, your current implementation could also hog heap very
> badly if your batchscanner returns too many records. The producer/consumer
> I proposed should help here a little bit, but you should still be asserting
> upper-bounds to avoid running out of heap space in your client.
>
> On 5/20/14, 1:10 PM, Slater, David M. wrote:
> > Hey everyone,
> >
> > I'm trying to improve the query performance of batchscans on my data
> table. I first scan over index tables, which returns a set of rowIDs that
> correspond to the records I am interested in. This set of records is fairly
> randomly (and uniformly) distributed across a large number of tablets, due
> to the randomness of the UID and the query itself. Then I want to scan over
> my data table, which is setup as follows:
> > row colFam colQual value
> > rowUID -- -- byte[] of
> data
> >
> > These records are fairly small (100s of bytes), but numerous (I may
> return 50000 or more). The method I use to obtain this follows.
> Essentially, I turn the rows returned from the first query into a set of
> ranges to input into the batchscanner, and then return those rows,
> retrieving the value from them.
> >
> > // returns the data associated with the given collection of rows
> > public Collection<byte[]> getRowData(Collection<Text> rows, Text
> dataType, String tablename, int queryThreads) throws TableNotFoundException
> {
> > List<byte[]> values = new ArrayList<byte[]>(rows.size());
> > if (!rows.isEmpty()) {
> > BatchScanner scanner = conn.createBatchScanner(tablename,
> new Authorizations(), queryThreads);
> > List<Range> ranges = new ArrayList<Range>();
> > for (Text row : rows) {
> > ranges.add(new Range(row));
> > }
> > scanner.setRanges(ranges);
> > for (Map.Entry<Key, Value> entry : scanner) {
> > values.add(entry.getValue().get());
> > }
> > scanner.close();
> > }
> > return values;
> > }
> >
> > Is there a more efficient way to do this? I have index caches and bloom
> filters enabled (data caches are not), but I still seem to have a long
> query lag. Any thoughts on how I can improve this?
> >
> > Thanks,
> > David
> >
>
RE: Improving Batchscanner Performance
Posted by "Slater, David M." <Da...@jhuapl.edu>.
Hi Josh,
I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which before entering this method with the collection of Text rowIDs. The method for that is:
public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
Set<Text> guids = new HashSet<Text>();
if (!ranges.isEmpty()) {
BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
scanner.setRanges(ranges);
scanner.fetchColumnFamily(term);
for (Map.Entry<Key, Value> entry : scanner) {
guids.add(entry.getKey().getColumnQualifier());
if (guids.size() > limit) {
return null;
}
}
scanner.close();
}
return guids;
}
Essentially, my query does:
Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new Text("index"), "mytable", 10, 10000);
Collection<byte[]> data = getRowData(rows, "mytable", 10);
-----Original Message-----
From: Josh Elser [mailto:josh.elser@gmail.com]
Sent: Tuesday, May 20, 2014 1:32 PM
To: user@accumulo.apache.org
Subject: Re: Improving Batchscanner Performance
Hi David,
Absolutely. What you have here is a classic producer-consumer model.
Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
On 5/20/14, 1:10 PM, Slater, David M. wrote:
> Hey everyone,
>
> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
> row colFam colQual value
> rowUID -- -- byte[] of data
>
> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>
> // returns the data associated with the given collection of rows
> public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
> List<byte[]> values = new ArrayList<byte[]>(rows.size());
> if (!rows.isEmpty()) {
> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
> List<Range> ranges = new ArrayList<Range>();
> for (Text row : rows) {
> ranges.add(new Range(row));
> }
> scanner.setRanges(ranges);
> for (Map.Entry<Key, Value> entry : scanner) {
> values.add(entry.getValue().get());
> }
> scanner.close();
> }
> return values;
> }
>
> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>
> Thanks,
> David
>
Re: Improving Batchscanner Performance
Posted by Josh Elser <jo...@gmail.com>.
Hi David,
Absolutely. What you have here is a classic producer-consumer model.
Your BatchScanner is producing results, which you then consume by your
scanner, and ultimately return those results to the client.
The problem with your below implementation is that you're not going to
be polling your batchscanner as aggressively as you could be. You are
blocking while you can fetch each of those new Ranges from the Scanner
before fetching new ranges. Have you considered splitting up the
BatchScanner and Scanner code into two different threads?
You could easily use a ArrayBlockingQueue (or similar) to pass results
from the BatchScanner to the Scanner. I would imagine that this would
give you a fair improvement in performance.
Also, it doesn't appear that there's a reason you can't use a
BatchScanner for both lookups?
One final warning, your current implementation could also hog heap very
badly if your batchscanner returns too many records. The
producer/consumer I proposed should help here a little bit, but you
should still be asserting upper-bounds to avoid running out of heap
space in your client.
On 5/20/14, 1:10 PM, Slater, David M. wrote:
> Hey everyone,
>
> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
> row colFam colQual value
> rowUID -- -- byte[] of data
>
> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>
> // returns the data associated with the given collection of rows
> public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
> List<byte[]> values = new ArrayList<byte[]>(rows.size());
> if (!rows.isEmpty()) {
> BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
> List<Range> ranges = new ArrayList<Range>();
> for (Text row : rows) {
> ranges.add(new Range(row));
> }
> scanner.setRanges(ranges);
> for (Map.Entry<Key, Value> entry : scanner) {
> values.add(entry.getValue().get());
> }
> scanner.close();
> }
> return values;
> }
>
> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>
> Thanks,
> David
>