You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@accumulo.apache.org by "Slater, David M." <Da...@jhuapl.edu> on 2014/05/20 19:10:56 UTC

Improving Batchscanner Performance

Hey everyone,

I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
row     		colFam      	colQual     	value
rowUID  	 --          		--          		byte[] of data

These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them. 

// returns the data associated with the given collection of rows
    public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
        List<byte[]> values = new ArrayList<byte[]>(rows.size());
        if (!rows.isEmpty()) {
            BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
            List<Range> ranges = new ArrayList<Range>();
            for (Text row : rows) {
                ranges.add(new Range(row));
            }
            scanner.setRanges(ranges);
            for (Map.Entry<Key, Value> entry : scanner) {
                values.add(entry.getValue().get());
            }
            scanner.close();
        }
        return values;
    }

Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?

Thanks,
David

RE: Improving Batchscanner Performance

Posted by Bo...@l-3com.com.
David

I use this same pattern and have for the last couple of years.  What I have put in place is a cache (redis) between the batchscanner thread that is reading the index tables and a separate consumer thread that's doing the final lookup of the rowID's.  The rowID pool can grow as it needs to in redis, then the consumer thread pulls whatever ID's it needs to do the final scan to get my row data.  Tuning the batchsize for the scanner to a small number to match the consumers needs has helped us keep the final scanner from blocking on too much data or flooding the network with unneeded data between the server and client(s).

I would recommend you reconsider the used of UUID for rowID, we found a significant performance improvement by using an organic rowID that fit the data/purpose better which orders the data on fewer tablet servers most of the time.  The accumulation of more threads in a batch scanner just because we used random rowID seems to have diminishing returns as the rowID's approach randomness.

HTH

-----Original Message-----
From: Slater, David M. [mailto:David.Slater@jhuapl.edu] 
Sent: Tuesday, May 20, 2014 12:11 PM
To: user@accumulo.apache.org
Subject: Improving Batchscanner Performance

Hey everyone,

I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
row     		colFam      	colQual     	value
rowUID  	 --          		--          		byte[] of data

These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them. 

// returns the data associated with the given collection of rows
    public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
        List<byte[]> values = new ArrayList<byte[]>(rows.size());
        if (!rows.isEmpty()) {
            BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
            List<Range> ranges = new ArrayList<Range>();
            for (Text row : rows) {
                ranges.add(new Range(row));
            }
            scanner.setRanges(ranges);
            for (Map.Entry<Key, Value> entry : scanner) {
                values.add(entry.getValue().get());
            }
            scanner.close();
        }
        return values;
    }

Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?

Thanks,
David

RE: Improving Batchscanner Performance

Posted by "Slater, David M." <Da...@jhuapl.edu>.
Ah, here is some rfile info:

hadoop fs -ls

-rw-r--r--   2 hadoop supergroup   88638093 2014-05-21 12:44 /accumulo/tables/32/t-0014fhi/A0014h25.rf

rfile.info

Locality group         : <DEFAULT>
        Start block          : 0
        Num   blocks         : 2,938
        Index level 1        : 290 bytes  1 blocks
        Index level 0        : 297,686 bytes  3 blocks
        First key            : 14006808|19|000001dd-04f9-4dc4-9d0e-0d7070d6ebaa ipdst:128.244.17.61 [] 1400684778699 false
        Last key             : 14006808|19|ffffefd8-9b71-4a92-aa4d-06259dcc1e58 ttl:64 [] 1400685445408 false
        Num entries          : 7,906,420
        Column families      : [ttl, ipdst, question, response,PTR, response,NS, timestamp, ipsrc, response,A, response,TXT, response,CNAME]

Meta block     : BCFile.index
      Raw size             : 4 bytes
      Compressed size      : 12 bytes
      Compression type     : gz

Meta block     : RFile.index
      Raw size             : 620 bytes
      Compressed size      : 414 bytes
      Compression type     : gz

Meta block     : acu_bloom
      Raw size             : 3,080,506 bytes
      Compressed size      : 2,502,618 bytes
      Compression type     : gz

Thanks,
David

-----Original Message-----
From: Josh Elser [mailto:josh.elser@gmail.com]
Sent: Wednesday, May 21, 2014 1:30 PM
To: user@accumulo.apache.org
Subject: Re: Improving Batchscanner Performance

Inline'd this time

On 5/21/14, 12:58 PM, Slater, David M. wrote:
> You are correct that the "bin" is largely redundant. I created that because I was not guaranteed that the guid was uniformly random (I have seen some that aren't uniformly distributed), and I'm not the one who specified it. There is another mechanism I didn't mention, which is that the bin is prepended by a timeblock (typically an hour span), and my data is streaming. So essentially, I create a number of splits for the next timeblock for X bins, and then when the data input moves into that time block it can ingest directly onto empty tablets.

Gotcha.

> I don't think rfile-info comes on 1.4, but I looked at the !METADATA table, and if I'm reading it correctly:

Oops, you're right, I think it was introduced in 1.5, but it's just a wrapper. You can invoke the PrintInfo class directly:

accumulo org.apache.accumulo.core.file.rfile.PrintInfo '/path/to/rfile.rf'

> 31;14006844|00 file:/t-0014fpy/A0014h4u.rf []    155454467,5450454
>
> This is a 155 MB file with an index block of 5.45 MB. This is a typical size for a timeblock|bin combination.
>
> After the data gets over a day old, I do a nightly job to merge the bins for each timeblock together, resulting in data like:
> 31;14000292|00 file:/t-0011bgk/C0011e06.rf []    1922144744,67390597
> 31;14000292|00 file:/t-0011bgk/C0011ed3.rf []    1942040855,68058489
>
> This is about 4 GB with 140 MB of index. So it looks like the index size is about 3.5% of the files, if I'm reading it correctly.

I think you're confused about what those numbers mean. The two numbers in the Value are size in bytes and number of entries, not data size and index size.

This means that your entries are about 30bytes in size, which seems in line with what you described given the encoding/compression Accumulo is doing.

You could try playing with table.file.compress.blocksize. IIRC, if you reduce this value from the default 100k, you would get more blocks per RFile, which means that you get more index records, which, in turn, would mean you can find your records faster at the cost of a larger index.

> In total, there about 440 tablets per server, with 4 servers, storing a total of about 2.1 TB of data (each server has a single 1 TB HDD).
>
> I enabled bloom filters, but I didn't restart Accumulo. Is it necessary to restart Accumulo to do that, or are bloom filters normally generated? I have an index cache of 256M for each tserver.

After a quick glance at the code, it appears that every time a Reader is opened to a file, we check the configuration and use a BloomFilter if enabled. I don't think you need to restart the tservers.

> Thanks,
> David
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Wednesday, May 21, 2014 12:18 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> I wouldn't expect that you'd see much difference moving the guid to the colfam (or colqual for that matter).
>
> A few more questions that come to mind though...
>
> * What's the purpose of the "bin"? Your guid is likely random anyways which will give you uniformity (which is what a bin prefix like that is usually meant to provide).
>
> * How many splits do you have on this table? At least a few per tserver?
>
> You could also try looking at the size of the index for a couple of rfiles for your table (`bin/accumulo rfile-info '/hdfs/path/to/rfile.rf'`). I would think that you should have faster lookups than what you noted.
>
> On 5/20/14, 4:34 PM, Slater, David M. wrote:
>> 10-100 entries per node (4 nodes total).
>>
>> Would changing the data table structure change the batchscanner performance?
>>
>> I'm using:
>> row          colFam          colQual         value
>> bin|guid     --              --              byte[]
>>
>> would it be faster/slower to use:
>> row          colFam          colQual         value
>> bin          guid            --              byte[]
>>
>> The difference would be that the first would include everything as a Collection of ranges, where the second would use a combination of ranges and setting column families.
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 3:17 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> 10-100 entries/s seems slow, but that's mostly a gut feeling without context. Is this over more than one node? 10s of nodes?
>>
>> A value of 1M would would explain the pause that you see in the beginning. That parameter controls the size of the buffer that each tserver will fill before sending data back to the BatchScanner. Too small and you pay for the excessive RPCs, too large, and like you're seeing, it takes longer for you to get the first batch. You should be able to reduce that value and see a much quick first result come out of the batchscanner.
>>
>> Number of rfiles could impact read performance as you have to do a merged-read over all of the rfiles for a tablet.
>>
>> On 5/20/14, 3:08 PM, Slater, David M. wrote:
>>> I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.
>>>
>>> I don't think this would be a problem with table.scan.max.memory=1M, would it?
>>>
>>> Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?
>>>
>>> -----Original Message-----
>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>> Sent: Tuesday, May 20, 2014 2:42 PM
>>> To: user@accumulo.apache.org
>>> Subject: Re: Improving Batchscanner Performance
>>>
>>> No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.
>>>
>>> The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.
>>>
>>> If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.
>>>
>>> On 5/20/14, 2:24 PM, Slater, David M. wrote:
>>>> Josh,
>>>>
>>>> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>>>>
>>>> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>>>>
>>>> Does the BatchScanner fetch rows based on the ordering of the Collection?
>>>>
>>>> Thanks,
>>>> David
>>>>
>>>> -----Original Message-----
>>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>>> Sent: Tuesday, May 20, 2014 1:59 PM
>>>> To: user@accumulo.apache.org
>>>> Subject: Re: Improving Batchscanner Performance
>>>>
>>>> You actually stated it exactly here:
>>>>
>>>>      > I complete the first scan in its entirety
>>>>
>>>> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>>>>
>>>>      > Collection<Text> rows = getRowIDs(new Range("minRow",
>>>> "maxRow"), new Text("index"), "mytable", 10, 10000);  >
>>>> Collection<byte[]> data = getRowData(rows, "mytable", 10);
>>>>
>>>> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>>>>
>>>> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>>>>
>>>> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>>>>
>>>>
>>>> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>>>>> Hi Josh,
>>>>>
>>>>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>>>>
>>>>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>>>>
>>>>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which  before entering this method with the collection of Text rowIDs. The method for that is:
>>>>>
>>>>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>>>>              Set<Text> guids = new HashSet<Text>();
>>>>>              if (!ranges.isEmpty()) {
>>>>>                  BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>>                  scanner.setRanges(ranges);
>>>>>                  scanner.fetchColumnFamily(term);
>>>>>                  for (Map.Entry<Key, Value> entry : scanner) {
>>>>>                      guids.add(entry.getKey().getColumnQualifier());
>>>>>                      if (guids.size() > limit) {
>>>>>                          return null;
>>>>>                      }
>>>>>                  }
>>>>>                  scanner.close();
>>>>>              }
>>>>>              return guids;
>>>>>          }
>>>>>
>>>>> Essentially, my query does:
>>>>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"),
>>>>> new Text("index"), "mytable", 10, 10000); Collection<byte[]> data
>>>>> = getRowData(rows, "mytable", 10);
>>>>>
>>>>>
>>>>> -----Original Message-----
>>>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>>>> Sent: Tuesday, May 20, 2014 1:32 PM
>>>>> To: user@accumulo.apache.org
>>>>> Subject: Re: Improving Batchscanner Performance
>>>>>
>>>>> Hi David,
>>>>>
>>>>> Absolutely. What you have here is a classic producer-consumer model.
>>>>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>>>>
>>>>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>>>>
>>>>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>>>>
>>>>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>>>>
>>>>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>>>>
>>>>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>>>>> Hey everyone,
>>>>>>
>>>>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>>>>> row              colFam          colQual         value
>>>>>> rowUID    --                     --                      byte[] of data
>>>>>>
>>>>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>>>>
>>>>>> // returns the data associated with the given collection of rows
>>>>>>           public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>>>>               List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>>>>               if (!rows.isEmpty()) {
>>>>>>                   BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>>>                   List<Range> ranges = new ArrayList<Range>();
>>>>>>                   for (Text row : rows) {
>>>>>>                       ranges.add(new Range(row));
>>>>>>                   }
>>>>>>                   scanner.setRanges(ranges);
>>>>>>                   for (Map.Entry<Key, Value> entry : scanner) {
>>>>>>                       values.add(entry.getValue().get());
>>>>>>                   }
>>>>>>                   scanner.close();
>>>>>>               }
>>>>>>               return values;
>>>>>>           }
>>>>>>
>>>>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>>>>
>>>>>> Thanks,
>>>>>> David
>>>>>>

Re: Improving Batchscanner Performance

Posted by Josh Elser <jo...@gmail.com>.
Inline'd this time

On 5/21/14, 12:58 PM, Slater, David M. wrote:
> You are correct that the "bin" is largely redundant. I created that because I was not guaranteed that the guid was uniformly random (I have seen some that aren't uniformly distributed), and I'm not the one who specified it. There is another mechanism I didn't mention, which is that the bin is prepended by a timeblock (typically an hour span), and my data is streaming. So essentially, I create a number of splits for the next timeblock for X bins, and then when the data input moves into that time block it can ingest directly onto empty tablets.

Gotcha.

> I don't think rfile-info comes on 1.4, but I looked at the !METADATA table, and if I'm reading it correctly:

Oops, you're right, I think it was introduced in 1.5, but it's just a 
wrapper. You can invoke the PrintInfo class directly:

accumulo org.apache.accumulo.core.file.rfile.PrintInfo '/path/to/rfile.rf'

> 31;14006844|00 file:/t-0014fpy/A0014h4u.rf []    155454467,5450454
>
> This is a 155 MB file with an index block of 5.45 MB. This is a typical size for a timeblock|bin combination.
>
> After the data gets over a day old, I do a nightly job to merge the bins for each timeblock together, resulting in data like:
> 31;14000292|00 file:/t-0011bgk/C0011e06.rf []    1922144744,67390597
> 31;14000292|00 file:/t-0011bgk/C0011ed3.rf []    1942040855,68058489
>
> This is about 4 GB with 140 MB of index. So it looks like the index size is about 3.5% of the files, if I'm reading it correctly.

I think you're confused about what those numbers mean. The two numbers 
in the Value are size in bytes and number of entries, not data size and 
index size.

This means that your entries are about 30bytes in size, which seems in 
line with what you described given the encoding/compression Accumulo is 
doing.

You could try playing with table.file.compress.blocksize. IIRC, if you 
reduce this value from the default 100k, you would get more blocks per 
RFile, which means that you get more index records, which, in turn, 
would mean you can find your records faster at the cost of a larger index.

> In total, there about 440 tablets per server, with 4 servers, storing a total of about 2.1 TB of data (each server has a single 1 TB HDD).
>
> I enabled bloom filters, but I didn't restart Accumulo. Is it necessary to restart Accumulo to do that, or are bloom filters normally generated? I have an index cache of 256M for each tserver.

After a quick glance at the code, it appears that every time a Reader is 
opened to a file, we check the configuration and use a BloomFilter if 
enabled. I don't think you need to restart the tservers.

> Thanks,
> David
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Wednesday, May 21, 2014 12:18 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> I wouldn't expect that you'd see much difference moving the guid to the colfam (or colqual for that matter).
>
> A few more questions that come to mind though...
>
> * What's the purpose of the "bin"? Your guid is likely random anyways which will give you uniformity (which is what a bin prefix like that is usually meant to provide).
>
> * How many splits do you have on this table? At least a few per tserver?
>
> You could also try looking at the size of the index for a couple of rfiles for your table (`bin/accumulo rfile-info '/hdfs/path/to/rfile.rf'`). I would think that you should have faster lookups than what you noted.
>
> On 5/20/14, 4:34 PM, Slater, David M. wrote:
>> 10-100 entries per node (4 nodes total).
>>
>> Would changing the data table structure change the batchscanner performance?
>>
>> I'm using:
>> row		colFam		colQual		value
>> bin|guid	--		--		byte[]
>>
>> would it be faster/slower to use:
>> row		colFam		colQual		value
>> bin		guid		--		byte[]
>>
>> The difference would be that the first would include everything as a Collection of ranges, where the second would use a combination of ranges and setting column families.
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 3:17 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> 10-100 entries/s seems slow, but that's mostly a gut feeling without context. Is this over more than one node? 10s of nodes?
>>
>> A value of 1M would would explain the pause that you see in the beginning. That parameter controls the size of the buffer that each tserver will fill before sending data back to the BatchScanner. Too small and you pay for the excessive RPCs, too large, and like you're seeing, it takes longer for you to get the first batch. You should be able to reduce that value and see a much quick first result come out of the batchscanner.
>>
>> Number of rfiles could impact read performance as you have to do a merged-read over all of the rfiles for a tablet.
>>
>> On 5/20/14, 3:08 PM, Slater, David M. wrote:
>>> I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.
>>>
>>> I don't think this would be a problem with table.scan.max.memory=1M, would it?
>>>
>>> Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?
>>>
>>> -----Original Message-----
>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>> Sent: Tuesday, May 20, 2014 2:42 PM
>>> To: user@accumulo.apache.org
>>> Subject: Re: Improving Batchscanner Performance
>>>
>>> No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.
>>>
>>> The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.
>>>
>>> If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.
>>>
>>> On 5/20/14, 2:24 PM, Slater, David M. wrote:
>>>> Josh,
>>>>
>>>> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>>>>
>>>> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>>>>
>>>> Does the BatchScanner fetch rows based on the ordering of the Collection?
>>>>
>>>> Thanks,
>>>> David
>>>>
>>>> -----Original Message-----
>>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>>> Sent: Tuesday, May 20, 2014 1:59 PM
>>>> To: user@accumulo.apache.org
>>>> Subject: Re: Improving Batchscanner Performance
>>>>
>>>> You actually stated it exactly here:
>>>>
>>>>      > I complete the first scan in its entirety
>>>>
>>>> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>>>>
>>>>      > Collection<Text> rows = getRowIDs(new Range("minRow",
>>>> "maxRow"), new Text("index"), "mytable", 10, 10000);  >
>>>> Collection<byte[]> data = getRowData(rows, "mytable", 10);
>>>>
>>>> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>>>>
>>>> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>>>>
>>>> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>>>>
>>>>
>>>> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>>>>> Hi Josh,
>>>>>
>>>>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>>>>
>>>>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>>>>
>>>>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which  before entering this method with the collection of Text rowIDs. The method for that is:
>>>>>
>>>>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>>>>              Set<Text> guids = new HashSet<Text>();
>>>>>              if (!ranges.isEmpty()) {
>>>>>                  BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>>                  scanner.setRanges(ranges);
>>>>>                  scanner.fetchColumnFamily(term);
>>>>>                  for (Map.Entry<Key, Value> entry : scanner) {
>>>>>                      guids.add(entry.getKey().getColumnQualifier());
>>>>>                      if (guids.size() > limit) {
>>>>>                          return null;
>>>>>                      }
>>>>>                  }
>>>>>                  scanner.close();
>>>>>              }
>>>>>              return guids;
>>>>>          }
>>>>>
>>>>> Essentially, my query does:
>>>>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"),
>>>>> new Text("index"), "mytable", 10, 10000); Collection<byte[]> data =
>>>>> getRowData(rows, "mytable", 10);
>>>>>
>>>>>
>>>>> -----Original Message-----
>>>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>>>> Sent: Tuesday, May 20, 2014 1:32 PM
>>>>> To: user@accumulo.apache.org
>>>>> Subject: Re: Improving Batchscanner Performance
>>>>>
>>>>> Hi David,
>>>>>
>>>>> Absolutely. What you have here is a classic producer-consumer model.
>>>>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>>>>
>>>>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>>>>
>>>>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>>>>
>>>>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>>>>
>>>>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>>>>
>>>>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>>>>> Hey everyone,
>>>>>>
>>>>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>>>>> row     		colFam      	colQual     	value
>>>>>> rowUID  	 --          		--          		byte[] of data
>>>>>>
>>>>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>>>>
>>>>>> // returns the data associated with the given collection of rows
>>>>>>           public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>>>>               List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>>>>               if (!rows.isEmpty()) {
>>>>>>                   BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>>>                   List<Range> ranges = new ArrayList<Range>();
>>>>>>                   for (Text row : rows) {
>>>>>>                       ranges.add(new Range(row));
>>>>>>                   }
>>>>>>                   scanner.setRanges(ranges);
>>>>>>                   for (Map.Entry<Key, Value> entry : scanner) {
>>>>>>                       values.add(entry.getValue().get());
>>>>>>                   }
>>>>>>                   scanner.close();
>>>>>>               }
>>>>>>               return values;
>>>>>>           }
>>>>>>
>>>>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>>>>
>>>>>> Thanks,
>>>>>> David
>>>>>>

RE: Improving Batchscanner Performance

Posted by "Slater, David M." <Da...@jhuapl.edu>.
You are correct that the "bin" is largely redundant. I created that because I was not guaranteed that the guid was uniformly random (I have seen some that aren't uniformly distributed), and I'm not the one who specified it. There is another mechanism I didn't mention, which is that the bin is prepended by a timeblock (typically an hour span), and my data is streaming. So essentially, I create a number of splits for the next timeblock for X bins, and then when the data input moves into that time block it can ingest directly onto empty tablets. 

I don't think rfile-info comes on 1.4, but I looked at the !METADATA table, and if I'm reading it correctly:
31;14006844|00 file:/t-0014fpy/A0014h4u.rf []    155454467,5450454

This is a 155 MB file with an index block of 5.45 MB. This is a typical size for a timeblock|bin combination.

After the data gets over a day old, I do a nightly job to merge the bins for each timeblock together, resulting in data like:
31;14000292|00 file:/t-0011bgk/C0011e06.rf []    1922144744,67390597
31;14000292|00 file:/t-0011bgk/C0011ed3.rf []    1942040855,68058489

This is about 4 GB with 140 MB of index. So it looks like the index size is about 3.5% of the files, if I'm reading it correctly.

In total, there about 440 tablets per server, with 4 servers, storing a total of about 2.1 TB of data (each server has a single 1 TB HDD). 

I enabled bloom filters, but I didn't restart Accumulo. Is it necessary to restart Accumulo to do that, or are bloom filters normally generated? I have an index cache of 256M for each tserver.

Thanks,
David

-----Original Message-----
From: Josh Elser [mailto:josh.elser@gmail.com] 
Sent: Wednesday, May 21, 2014 12:18 PM
To: user@accumulo.apache.org
Subject: Re: Improving Batchscanner Performance

I wouldn't expect that you'd see much difference moving the guid to the colfam (or colqual for that matter).

A few more questions that come to mind though...

* What's the purpose of the "bin"? Your guid is likely random anyways which will give you uniformity (which is what a bin prefix like that is usually meant to provide).

* How many splits do you have on this table? At least a few per tserver?

You could also try looking at the size of the index for a couple of rfiles for your table (`bin/accumulo rfile-info '/hdfs/path/to/rfile.rf'`). I would think that you should have faster lookups than what you noted.

On 5/20/14, 4:34 PM, Slater, David M. wrote:
> 10-100 entries per node (4 nodes total).
>
> Would changing the data table structure change the batchscanner performance?
>
> I'm using:
> row		colFam		colQual		value
> bin|guid	--		--		byte[]
>
> would it be faster/slower to use:
> row		colFam		colQual		value
> bin		guid		--		byte[]
>
> The difference would be that the first would include everything as a Collection of ranges, where the second would use a combination of ranges and setting column families.
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 3:17 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> 10-100 entries/s seems slow, but that's mostly a gut feeling without context. Is this over more than one node? 10s of nodes?
>
> A value of 1M would would explain the pause that you see in the beginning. That parameter controls the size of the buffer that each tserver will fill before sending data back to the BatchScanner. Too small and you pay for the excessive RPCs, too large, and like you're seeing, it takes longer for you to get the first batch. You should be able to reduce that value and see a much quick first result come out of the batchscanner.
>
> Number of rfiles could impact read performance as you have to do a merged-read over all of the rfiles for a tablet.
>
> On 5/20/14, 3:08 PM, Slater, David M. wrote:
>> I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.
>>
>> I don't think this would be a problem with table.scan.max.memory=1M, would it?
>>
>> Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 2:42 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.
>>
>> The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.
>>
>> If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.
>>
>> On 5/20/14, 2:24 PM, Slater, David M. wrote:
>>> Josh,
>>>
>>> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>>>
>>> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>>>
>>> Does the BatchScanner fetch rows based on the ordering of the Collection?
>>>
>>> Thanks,
>>> David
>>>
>>> -----Original Message-----
>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>> Sent: Tuesday, May 20, 2014 1:59 PM
>>> To: user@accumulo.apache.org
>>> Subject: Re: Improving Batchscanner Performance
>>>
>>> You actually stated it exactly here:
>>>
>>>     > I complete the first scan in its entirety
>>>
>>> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>>>
>>>     > Collection<Text> rows = getRowIDs(new Range("minRow", 
>>> "maxRow"), new Text("index"), "mytable", 10, 10000);  > 
>>> Collection<byte[]> data = getRowData(rows, "mytable", 10);
>>>
>>> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>>>
>>> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>>>
>>> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>>>
>>>
>>> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>>>> Hi Josh,
>>>>
>>>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>>>
>>>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>>>
>>>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which  before entering this method with the collection of Text rowIDs. The method for that is:
>>>>
>>>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>>>             Set<Text> guids = new HashSet<Text>();
>>>>             if (!ranges.isEmpty()) {
>>>>                 BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>                 scanner.setRanges(ranges);
>>>>                 scanner.fetchColumnFamily(term);
>>>>                 for (Map.Entry<Key, Value> entry : scanner) {
>>>>                     guids.add(entry.getKey().getColumnQualifier());
>>>>                     if (guids.size() > limit) {
>>>>                         return null;
>>>>                     }
>>>>                 }
>>>>                 scanner.close();
>>>>             }
>>>>             return guids;
>>>>         }
>>>>
>>>> Essentially, my query does:
>>>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), 
>>>> new Text("index"), "mytable", 10, 10000); Collection<byte[]> data = 
>>>> getRowData(rows, "mytable", 10);
>>>>
>>>>
>>>> -----Original Message-----
>>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>>> Sent: Tuesday, May 20, 2014 1:32 PM
>>>> To: user@accumulo.apache.org
>>>> Subject: Re: Improving Batchscanner Performance
>>>>
>>>> Hi David,
>>>>
>>>> Absolutely. What you have here is a classic producer-consumer model.
>>>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>>>
>>>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>>>
>>>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>>>
>>>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>>>
>>>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>>>
>>>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>>>> Hey everyone,
>>>>>
>>>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>>>> row     		colFam      	colQual     	value
>>>>> rowUID  	 --          		--          		byte[] of data
>>>>>
>>>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>>>
>>>>> // returns the data associated with the given collection of rows
>>>>>          public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>>>              List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>>>              if (!rows.isEmpty()) {
>>>>>                  BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>>                  List<Range> ranges = new ArrayList<Range>();
>>>>>                  for (Text row : rows) {
>>>>>                      ranges.add(new Range(row));
>>>>>                  }
>>>>>                  scanner.setRanges(ranges);
>>>>>                  for (Map.Entry<Key, Value> entry : scanner) {
>>>>>                      values.add(entry.getValue().get());
>>>>>                  }
>>>>>                  scanner.close();
>>>>>              }
>>>>>              return values;
>>>>>          }
>>>>>
>>>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>>>
>>>>> Thanks,
>>>>> David
>>>>>

Re: Improving Batchscanner Performance

Posted by Josh Elser <jo...@gmail.com>.
I wouldn't expect that you'd see much difference moving the guid to the 
colfam (or colqual for that matter).

A few more questions that come to mind though...

* What's the purpose of the "bin"? Your guid is likely random anyways 
which will give you uniformity (which is what a bin prefix like that is 
usually meant to provide).

* How many splits do you have on this table? At least a few per tserver?

You could also try looking at the size of the index for a couple of 
rfiles for your table (`bin/accumulo rfile-info 
'/hdfs/path/to/rfile.rf'`). I would think that you should have faster 
lookups than what you noted.

On 5/20/14, 4:34 PM, Slater, David M. wrote:
> 10-100 entries per node (4 nodes total).
>
> Would changing the data table structure change the batchscanner performance?
>
> I'm using:
> row		colFam		colQual		value
> bin|guid	--		--		byte[]
>
> would it be faster/slower to use:
> row		colFam		colQual		value
> bin		guid		--		byte[]
>
> The difference would be that the first would include everything as a Collection of ranges, where the second would use a combination of ranges and setting column families.
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 3:17 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> 10-100 entries/s seems slow, but that's mostly a gut feeling without context. Is this over more than one node? 10s of nodes?
>
> A value of 1M would would explain the pause that you see in the beginning. That parameter controls the size of the buffer that each tserver will fill before sending data back to the BatchScanner. Too small and you pay for the excessive RPCs, too large, and like you're seeing, it takes longer for you to get the first batch. You should be able to reduce that value and see a much quick first result come out of the batchscanner.
>
> Number of rfiles could impact read performance as you have to do a merged-read over all of the rfiles for a tablet.
>
> On 5/20/14, 3:08 PM, Slater, David M. wrote:
>> I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.
>>
>> I don't think this would be a problem with table.scan.max.memory=1M, would it?
>>
>> Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 2:42 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.
>>
>> The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.
>>
>> If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.
>>
>> On 5/20/14, 2:24 PM, Slater, David M. wrote:
>>> Josh,
>>>
>>> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>>>
>>> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>>>
>>> Does the BatchScanner fetch rows based on the ordering of the Collection?
>>>
>>> Thanks,
>>> David
>>>
>>> -----Original Message-----
>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>> Sent: Tuesday, May 20, 2014 1:59 PM
>>> To: user@accumulo.apache.org
>>> Subject: Re: Improving Batchscanner Performance
>>>
>>> You actually stated it exactly here:
>>>
>>>     > I complete the first scan in its entirety
>>>
>>> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>>>
>>>     > Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"),
>>> new Text("index"), "mytable", 10, 10000);  > Collection<byte[]> data
>>> = getRowData(rows, "mytable", 10);
>>>
>>> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>>>
>>> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>>>
>>> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>>>
>>>
>>> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>>>> Hi Josh,
>>>>
>>>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>>>
>>>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>>>
>>>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which  before entering this method with the collection of Text rowIDs. The method for that is:
>>>>
>>>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>>>             Set<Text> guids = new HashSet<Text>();
>>>>             if (!ranges.isEmpty()) {
>>>>                 BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>                 scanner.setRanges(ranges);
>>>>                 scanner.fetchColumnFamily(term);
>>>>                 for (Map.Entry<Key, Value> entry : scanner) {
>>>>                     guids.add(entry.getKey().getColumnQualifier());
>>>>                     if (guids.size() > limit) {
>>>>                         return null;
>>>>                     }
>>>>                 }
>>>>                 scanner.close();
>>>>             }
>>>>             return guids;
>>>>         }
>>>>
>>>> Essentially, my query does:
>>>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new
>>>> Text("index"), "mytable", 10, 10000); Collection<byte[]> data =
>>>> getRowData(rows, "mytable", 10);
>>>>
>>>>
>>>> -----Original Message-----
>>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>>> Sent: Tuesday, May 20, 2014 1:32 PM
>>>> To: user@accumulo.apache.org
>>>> Subject: Re: Improving Batchscanner Performance
>>>>
>>>> Hi David,
>>>>
>>>> Absolutely. What you have here is a classic producer-consumer model.
>>>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>>>
>>>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>>>
>>>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>>>
>>>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>>>
>>>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>>>
>>>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>>>> Hey everyone,
>>>>>
>>>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>>>> row     		colFam      	colQual     	value
>>>>> rowUID  	 --          		--          		byte[] of data
>>>>>
>>>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>>>
>>>>> // returns the data associated with the given collection of rows
>>>>>          public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>>>              List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>>>              if (!rows.isEmpty()) {
>>>>>                  BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>>                  List<Range> ranges = new ArrayList<Range>();
>>>>>                  for (Text row : rows) {
>>>>>                      ranges.add(new Range(row));
>>>>>                  }
>>>>>                  scanner.setRanges(ranges);
>>>>>                  for (Map.Entry<Key, Value> entry : scanner) {
>>>>>                      values.add(entry.getValue().get());
>>>>>                  }
>>>>>                  scanner.close();
>>>>>              }
>>>>>              return values;
>>>>>          }
>>>>>
>>>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>>>
>>>>> Thanks,
>>>>> David
>>>>>

RE: Improving Batchscanner Performance

Posted by "Slater, David M." <Da...@jhuapl.edu>.
10-100 entries per node (4 nodes total).

Would changing the data table structure change the batchscanner performance?

I'm using:
row		colFam		colQual		value
bin|guid	--		--		byte[]

would it be faster/slower to use:
row		colFam		colQual		value
bin		guid		--		byte[]

The difference would be that the first would include everything as a Collection of ranges, where the second would use a combination of ranges and setting column families. 

-----Original Message-----
From: Josh Elser [mailto:josh.elser@gmail.com] 
Sent: Tuesday, May 20, 2014 3:17 PM
To: user@accumulo.apache.org
Subject: Re: Improving Batchscanner Performance

10-100 entries/s seems slow, but that's mostly a gut feeling without context. Is this over more than one node? 10s of nodes?

A value of 1M would would explain the pause that you see in the beginning. That parameter controls the size of the buffer that each tserver will fill before sending data back to the BatchScanner. Too small and you pay for the excessive RPCs, too large, and like you're seeing, it takes longer for you to get the first batch. You should be able to reduce that value and see a much quick first result come out of the batchscanner.

Number of rfiles could impact read performance as you have to do a merged-read over all of the rfiles for a tablet.

On 5/20/14, 3:08 PM, Slater, David M. wrote:
> I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.
>
> I don't think this would be a problem with table.scan.max.memory=1M, would it?
>
> Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 2:42 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.
>
> The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.
>
> If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.
>
> On 5/20/14, 2:24 PM, Slater, David M. wrote:
>> Josh,
>>
>> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>>
>> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>>
>> Does the BatchScanner fetch rows based on the ordering of the Collection?
>>
>> Thanks,
>> David
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 1:59 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> You actually stated it exactly here:
>>
>>    > I complete the first scan in its entirety
>>
>> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>>
>>    > Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), 
>> new Text("index"), "mytable", 10, 10000);  > Collection<byte[]> data 
>> = getRowData(rows, "mytable", 10);
>>
>> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>>
>> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>>
>> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>>
>>
>> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>>> Hi Josh,
>>>
>>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>>
>>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>>
>>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which  before entering this method with the collection of Text rowIDs. The method for that is:
>>>
>>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>>            Set<Text> guids = new HashSet<Text>();
>>>            if (!ranges.isEmpty()) {
>>>                BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>                scanner.setRanges(ranges);
>>>                scanner.fetchColumnFamily(term);
>>>                for (Map.Entry<Key, Value> entry : scanner) {
>>>                    guids.add(entry.getKey().getColumnQualifier());
>>>                    if (guids.size() > limit) {
>>>                        return null;
>>>                    }
>>>                }
>>>                scanner.close();
>>>            }
>>>            return guids;
>>>        }
>>>
>>> Essentially, my query does:
>>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new 
>>> Text("index"), "mytable", 10, 10000); Collection<byte[]> data = 
>>> getRowData(rows, "mytable", 10);
>>>
>>>
>>> -----Original Message-----
>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>> Sent: Tuesday, May 20, 2014 1:32 PM
>>> To: user@accumulo.apache.org
>>> Subject: Re: Improving Batchscanner Performance
>>>
>>> Hi David,
>>>
>>> Absolutely. What you have here is a classic producer-consumer model.
>>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>>
>>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>>
>>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>>
>>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>>
>>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>>
>>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>>> Hey everyone,
>>>>
>>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>>> row     		colFam      	colQual     	value
>>>> rowUID  	 --          		--          		byte[] of data
>>>>
>>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>>
>>>> // returns the data associated with the given collection of rows
>>>>         public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>>             List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>>             if (!rows.isEmpty()) {
>>>>                 BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>                 List<Range> ranges = new ArrayList<Range>();
>>>>                 for (Text row : rows) {
>>>>                     ranges.add(new Range(row));
>>>>                 }
>>>>                 scanner.setRanges(ranges);
>>>>                 for (Map.Entry<Key, Value> entry : scanner) {
>>>>                     values.add(entry.getValue().get());
>>>>                 }
>>>>                 scanner.close();
>>>>             }
>>>>             return values;
>>>>         }
>>>>
>>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>>
>>>> Thanks,
>>>> David
>>>>

Re: Improving Batchscanner Performance

Posted by Josh Elser <jo...@gmail.com>.
10-100 entries/s seems slow, but that's mostly a gut feeling without 
context. Is this over more than one node? 10s of nodes?

A value of 1M would would explain the pause that you see in the 
beginning. That parameter controls the size of the buffer that each 
tserver will fill before sending data back to the BatchScanner. Too 
small and you pay for the excessive RPCs, too large, and like you're 
seeing, it takes longer for you to get the first batch. You should be 
able to reduce that value and see a much quick first result come out of 
the batchscanner.

Number of rfiles could impact read performance as you have to do a 
merged-read over all of the rfiles for a tablet.

On 5/20/14, 3:08 PM, Slater, David M. wrote:
> I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.
>
> I don't think this would be a problem with table.scan.max.memory=1M, would it?
>
> Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 2:42 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.
>
> The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.
>
> If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.
>
> On 5/20/14, 2:24 PM, Slater, David M. wrote:
>> Josh,
>>
>> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>>
>> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>>
>> Does the BatchScanner fetch rows based on the ordering of the Collection?
>>
>> Thanks,
>> David
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 1:59 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> You actually stated it exactly here:
>>
>>    > I complete the first scan in its entirety
>>
>> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>>
>>    > Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"),
>> new Text("index"), "mytable", 10, 10000);  > Collection<byte[]> data =
>> getRowData(rows, "mytable", 10);
>>
>> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>>
>> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>>
>> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>>
>>
>> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>>> Hi Josh,
>>>
>>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>>
>>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>>
>>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which  before entering this method with the collection of Text rowIDs. The method for that is:
>>>
>>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>>            Set<Text> guids = new HashSet<Text>();
>>>            if (!ranges.isEmpty()) {
>>>                BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>                scanner.setRanges(ranges);
>>>                scanner.fetchColumnFamily(term);
>>>                for (Map.Entry<Key, Value> entry : scanner) {
>>>                    guids.add(entry.getKey().getColumnQualifier());
>>>                    if (guids.size() > limit) {
>>>                        return null;
>>>                    }
>>>                }
>>>                scanner.close();
>>>            }
>>>            return guids;
>>>        }
>>>
>>> Essentially, my query does:
>>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new
>>> Text("index"), "mytable", 10, 10000); Collection<byte[]> data =
>>> getRowData(rows, "mytable", 10);
>>>
>>>
>>> -----Original Message-----
>>> From: Josh Elser [mailto:josh.elser@gmail.com]
>>> Sent: Tuesday, May 20, 2014 1:32 PM
>>> To: user@accumulo.apache.org
>>> Subject: Re: Improving Batchscanner Performance
>>>
>>> Hi David,
>>>
>>> Absolutely. What you have here is a classic producer-consumer model.
>>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>>
>>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>>
>>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>>
>>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>>
>>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>>
>>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>>> Hey everyone,
>>>>
>>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>>> row     		colFam      	colQual     	value
>>>> rowUID  	 --          		--          		byte[] of data
>>>>
>>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>>
>>>> // returns the data associated with the given collection of rows
>>>>         public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>>             List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>>             if (!rows.isEmpty()) {
>>>>                 BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>>                 List<Range> ranges = new ArrayList<Range>();
>>>>                 for (Text row : rows) {
>>>>                     ranges.add(new Range(row));
>>>>                 }
>>>>                 scanner.setRanges(ranges);
>>>>                 for (Map.Entry<Key, Value> entry : scanner) {
>>>>                     values.add(entry.getValue().get());
>>>>                 }
>>>>                 scanner.close();
>>>>             }
>>>>             return values;
>>>>         }
>>>>
>>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>>
>>>> Thanks,
>>>> David
>>>>

RE: Improving Batchscanner Performance

Posted by "Slater, David M." <Da...@jhuapl.edu>.
I'm getting query results around 10-100 entries/s. However, it takes some time after starting the data scan to actually have any positive query number. The ingest rate into this table is about 10k entries/s.

I don't think this would be a problem with table.scan.max.memory=1M, would it? 

Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest is overwhelming the resources?

-----Original Message-----
From: Josh Elser [mailto:josh.elser@gmail.com] 
Sent: Tuesday, May 20, 2014 2:42 PM
To: user@accumulo.apache.org
Subject: Re: Improving Batchscanner Performance

No, that is how it's done. The ranges that you provide to the BatchScanner are binned to tablets hosted by tabletserver. It will then query up to numQueryThreads tservers at once to fetch results in parallel.

The point I was making is that you can only bin ranges within the scope of a single BatchScanner, and if you were making repeated calls to your original function with differing arguments, you might be incurring some more penalty. Like Bob, fetching random sets of rows and data is what I was trying to lead you to.

If the bandwidth of fetching the data is not a factor, I would probably agree that random reads are an issue. Do you have more details you can give about how long it takes to fetch the data for N rows (e.g. number of key-values/second and/or amount of data/second)? Are you getting an even distribution across your tservers or hot-spotted on a few number (the monitor should help here)? It can sometimes be a bit of a balancing act with optimizing locality while avoid suffering from hotspots.

On 5/20/14, 2:24 PM, Slater, David M. wrote:
> Josh,
>
> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>
> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>
> Does the BatchScanner fetch rows based on the ordering of the Collection?
>
> Thanks,
> David
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 1:59 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> You actually stated it exactly here:
>
>   > I complete the first scan in its entirety
>
> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>
>   > Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), 
> new Text("index"), "mytable", 10, 10000);  > Collection<byte[]> data = 
> getRowData(rows, "mytable", 10);
>
> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>
> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>
> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>
>
> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>> Hi Josh,
>>
>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>
>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>
>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which  before entering this method with the collection of Text rowIDs. The method for that is:
>>
>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>           Set<Text> guids = new HashSet<Text>();
>>           if (!ranges.isEmpty()) {
>>               BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>               scanner.setRanges(ranges);
>>               scanner.fetchColumnFamily(term);
>>               for (Map.Entry<Key, Value> entry : scanner) {
>>                   guids.add(entry.getKey().getColumnQualifier());
>>                   if (guids.size() > limit) {
>>                       return null;
>>                   }
>>               }
>>               scanner.close();
>>           }
>>           return guids;
>>       }
>>
>> Essentially, my query does:
>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new 
>> Text("index"), "mytable", 10, 10000); Collection<byte[]> data = 
>> getRowData(rows, "mytable", 10);
>>
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 1:32 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> Hi David,
>>
>> Absolutely. What you have here is a classic producer-consumer model.
>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>
>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>
>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>
>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>
>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>
>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>> Hey everyone,
>>>
>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>> row     		colFam      	colQual     	value
>>> rowUID  	 --          		--          		byte[] of data
>>>
>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>
>>> // returns the data associated with the given collection of rows
>>>        public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>            List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>            if (!rows.isEmpty()) {
>>>                BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>                List<Range> ranges = new ArrayList<Range>();
>>>                for (Text row : rows) {
>>>                    ranges.add(new Range(row));
>>>                }
>>>                scanner.setRanges(ranges);
>>>                for (Map.Entry<Key, Value> entry : scanner) {
>>>                    values.add(entry.getValue().get());
>>>                }
>>>                scanner.close();
>>>            }
>>>            return values;
>>>        }
>>>
>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>
>>> Thanks,
>>> David
>>>

Re: Improving Batchscanner Performance

Posted by Josh Elser <jo...@gmail.com>.
No, that is how it's done. The ranges that you provide to the 
BatchScanner are binned to tablets hosted by tabletserver. It will then 
query up to numQueryThreads tservers at once to fetch results in parallel.

The point I was making is that you can only bin ranges within the scope 
of a single BatchScanner, and if you were making repeated calls to your 
original function with differing arguments, you might be incurring some 
more penalty. Like Bob, fetching random sets of rows and data is what I 
was trying to lead you to.

If the bandwidth of fetching the data is not a factor, I would probably 
agree that random reads are an issue. Do you have more details you can 
give about how long it takes to fetch the data for N rows (e.g. number 
of key-values/second and/or amount of data/second)? Are you getting an 
even distribution across your tservers or hot-spotted on a few number 
(the monitor should help here)? It can sometimes be a bit of a balancing 
act with optimizing locality while avoid suffering from hotspots.

On 5/20/14, 2:24 PM, Slater, David M. wrote:
> Josh,
>
> The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.
>
> I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way.
>
> Does the BatchScanner fetch rows based on the ordering of the Collection?
>
> Thanks,
> David
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 1:59 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> You actually stated it exactly here:
>
>   > I complete the first scan in its entirety
>
> Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.
>
>   > Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new Text("index"), "mytable", 10, 10000);  > Collection<byte[]> data = getRowData(rows, "mytable", 10);
>
> Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.
>
> Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.
>
> You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.
>
>
> On 5/20/14, 1:51 PM, Slater, David M. wrote:
>> Hi Josh,
>>
>> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>>
>> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>>
>> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which  before entering this method with the collection of Text rowIDs. The method for that is:
>>
>> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>>           Set<Text> guids = new HashSet<Text>();
>>           if (!ranges.isEmpty()) {
>>               BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>               scanner.setRanges(ranges);
>>               scanner.fetchColumnFamily(term);
>>               for (Map.Entry<Key, Value> entry : scanner) {
>>                   guids.add(entry.getKey().getColumnQualifier());
>>                   if (guids.size() > limit) {
>>                       return null;
>>                   }
>>               }
>>               scanner.close();
>>           }
>>           return guids;
>>       }
>>
>> Essentially, my query does:
>> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new
>> Text("index"), "mytable", 10, 10000); Collection<byte[]> data =
>> getRowData(rows, "mytable", 10);
>>
>>
>> -----Original Message-----
>> From: Josh Elser [mailto:josh.elser@gmail.com]
>> Sent: Tuesday, May 20, 2014 1:32 PM
>> To: user@accumulo.apache.org
>> Subject: Re: Improving Batchscanner Performance
>>
>> Hi David,
>>
>> Absolutely. What you have here is a classic producer-consumer model.
>> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>>
>> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>>
>> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>>
>> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>>
>> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>>
>> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>>> Hey everyone,
>>>
>>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>>> row     		colFam      	colQual     	value
>>> rowUID  	 --          		--          		byte[] of data
>>>
>>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>>
>>> // returns the data associated with the given collection of rows
>>>        public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>>            List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>>            if (!rows.isEmpty()) {
>>>                BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>>                List<Range> ranges = new ArrayList<Range>();
>>>                for (Text row : rows) {
>>>                    ranges.add(new Range(row));
>>>                }
>>>                scanner.setRanges(ranges);
>>>                for (Map.Entry<Key, Value> entry : scanner) {
>>>                    values.add(entry.getValue().get());
>>>                }
>>>                scanner.close();
>>>            }
>>>            return values;
>>>        }
>>>
>>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>>
>>> Thanks,
>>> David
>>>

RE: Improving Batchscanner Performance

Posted by "Slater, David M." <Da...@jhuapl.edu>.
Josh,

The data is not significantly larger than the rows that I'm fetching. in terms of bandwidth, the data returned is at least 2 orders of magnitude smaller than the ingest rate, so I don't think it's a network issue.

I'm guessing, as Bob suggested, that it has to do with fetching a "random" set of rows each time. I had assumed that the batchscanner would take the Collection of ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on tablet splits. I'm guessing, based on the discussion, that it is not done that way. 

Does the BatchScanner fetch rows based on the ordering of the Collection?

Thanks,
David

-----Original Message-----
From: Josh Elser [mailto:josh.elser@gmail.com] 
Sent: Tuesday, May 20, 2014 1:59 PM
To: user@accumulo.apache.org
Subject: Re: Improving Batchscanner Performance

You actually stated it exactly here:

 > I complete the first scan in its entirety

Loading the data into a Collection also implies that you're loading the complete set of rows and blocking until you find all rows, or until you fetch all of the data.

 > Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new Text("index"), "mytable", 10, 10000);  > Collection<byte[]> data = getRowData(rows, "mytable", 10);

Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". The client talks to server(s), reads some data and returns it to you. By virtue of you loading these results from the Iterator into a Collection, you are consuming *all* results before proceeding to fetch the data for the rows.

Now, if, like you said, looking up the rows is drastically faster than fetching the data, there's a question as to why this is. Is it safe to assume that the data is much larger than the rows you're fetching? Have you tried to see what the throughput of fetching this data is? If it's bounded by network speed, you could try compressing the data in an iterator server-side before returning it to the client.

You could also consider the locality of the rows that you're fetching -- are you fetching a "random" set of rows each time and paying a penalty of talking to each server to fetch the data when you could ammortize the cost if you fetched the data for rows that are close together. A large amount of data being returned is likely going to trump the additional cost in talking to many servers.


On 5/20/14, 1:51 PM, Slater, David M. wrote:
> Hi Josh,
>
> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>
> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>
> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which  before entering this method with the collection of Text rowIDs. The method for that is:
>
> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>          Set<Text> guids = new HashSet<Text>();
>          if (!ranges.isEmpty()) {
>              BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>              scanner.setRanges(ranges);
>              scanner.fetchColumnFamily(term);
>              for (Map.Entry<Key, Value> entry : scanner) {
>                  guids.add(entry.getKey().getColumnQualifier());
>                  if (guids.size() > limit) {
>                      return null;
>                  }
>              }
>              scanner.close();
>          }
>          return guids;
>      }
>
> Essentially, my query does:
> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new 
> Text("index"), "mytable", 10, 10000); Collection<byte[]> data = 
> getRowData(rows, "mytable", 10);
>
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 1:32 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> Hi David,
>
> Absolutely. What you have here is a classic producer-consumer model.
> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>
> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>
> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>
> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>
> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>
> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>> Hey everyone,
>>
>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>> row     		colFam      	colQual     	value
>> rowUID  	 --          		--          		byte[] of data
>>
>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>
>> // returns the data associated with the given collection of rows
>>       public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>           List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>           if (!rows.isEmpty()) {
>>               BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>               List<Range> ranges = new ArrayList<Range>();
>>               for (Text row : rows) {
>>                   ranges.add(new Range(row));
>>               }
>>               scanner.setRanges(ranges);
>>               for (Map.Entry<Key, Value> entry : scanner) {
>>                   values.add(entry.getValue().get());
>>               }
>>               scanner.close();
>>           }
>>           return values;
>>       }
>>
>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>
>> Thanks,
>> David
>>

Re: Improving Batchscanner Performance

Posted by Josh Elser <jo...@gmail.com>.
You actually stated it exactly here:

 > I complete the first scan in its entirety

Loading the data into a Collection also implies that you're loading the 
complete set of rows and blocking until you find all rows, or until you 
fetch all of the data.

 > Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new 
Text("index"), "mytable", 10, 10000);
 > Collection<byte[]> data = getRowData(rows, "mytable", 10);

Both the BatchScanner and Scanner are returning KeyValue pairs in 
"batches". The client talks to server(s), reads some data and returns it 
to you. By virtue of you loading these results from the Iterator into a 
Collection, you are consuming *all* results before proceeding to fetch 
the data for the rows.

Now, if, like you said, looking up the rows is drastically faster than 
fetching the data, there's a question as to why this is. Is it safe to 
assume that the data is much larger than the rows you're fetching? Have 
you tried to see what the throughput of fetching this data is? If it's 
bounded by network speed, you could try compressing the data in an 
iterator server-side before returning it to the client.

You could also consider the locality of the rows that you're fetching -- 
are you fetching a "random" set of rows each time and paying a penalty 
of talking to each server to fetch the data when you could ammortize the 
cost if you fetched the data for rows that are close together. A large 
amount of data being returned is likely going to trump the additional 
cost in talking to many servers.


On 5/20/14, 1:51 PM, Slater, David M. wrote:
> Hi Josh,
>
> I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.
>
> The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method.
>
> As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which  before entering this method with the collection of Text rowIDs. The method for that is:
>
> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
>          Set<Text> guids = new HashSet<Text>();
>          if (!ranges.isEmpty()) {
>              BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>              scanner.setRanges(ranges);
>              scanner.fetchColumnFamily(term);
>              for (Map.Entry<Key, Value> entry : scanner) {
>                  guids.add(entry.getKey().getColumnQualifier());
>                  if (guids.size() > limit) {
>                      return null;
>                  }
>              }
>              scanner.close();
>          }
>          return guids;
>      }
>
> Essentially, my query does:
> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new Text("index"), "mytable", 10, 10000);
> Collection<byte[]> data = getRowData(rows, "mytable", 10);
>
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 1:32 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> Hi David,
>
> Absolutely. What you have here is a classic producer-consumer model.
> Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.
>
> The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?
>
> You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.
>
> Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?
>
> One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.
>
> On 5/20/14, 1:10 PM, Slater, David M. wrote:
>> Hey everyone,
>>
>> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
>> row     		colFam      	colQual     	value
>> rowUID  	 --          		--          		byte[] of data
>>
>> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>>
>> // returns the data associated with the given collection of rows
>>       public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>>           List<byte[]> values = new ArrayList<byte[]>(rows.size());
>>           if (!rows.isEmpty()) {
>>               BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>>               List<Range> ranges = new ArrayList<Range>();
>>               for (Text row : rows) {
>>                   ranges.add(new Range(row));
>>               }
>>               scanner.setRanges(ranges);
>>               for (Map.Entry<Key, Value> entry : scanner) {
>>                   values.add(entry.getValue().get());
>>               }
>>               scanner.close();
>>           }
>>           return values;
>>       }
>>
>> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>>
>> Thanks,
>> David
>>

Re: Improving Batchscanner Performance

Posted by William Slacum <wi...@accumulo.net>.
By "blocking", we mean you have to complete the entire index look up before
fetching your records.

Conceptually, instead of returning a `Collection<Text> rows`, return an
`Iterator<Text> rows` and consume them in batches as the first look up
produces them. That way record look ups can occur in parallel with index
look ups.


On Tue, May 20, 2014 at 1:51 PM, Slater, David M.
<Da...@jhuapl.edu>wrote:

> Hi Josh,
>
> I should have clarified - I am using a batchscanner for both lookups. I
> had thought of putting it into two different threads, but the first scan is
> typically an order of magnitude faster than the second.
>
> The logic for upperbounding the results returned is outside of the method
> I provided. Since there is a one-to-one relationship between rowIDs and
> records on the second scan, I just limit the number of rows I send to this
> method.
>
> As for blocking, I'm not sure exactly what you mean. I complete the first
> scan in its entirety, which  before entering this method with the
> collection of Text rowIDs. The method for that is:
>
> public Collection<Text> getRowIDs(Collection<Range> ranges, Text term,
> String tablename, int queryThreads, int limit) throws
> TableNotFoundException {
>         Set<Text> guids = new HashSet<Text>();
>         if (!ranges.isEmpty()) {
>             BatchScanner scanner = conn.createBatchScanner(tablename, new
> Authorizations(), queryThreads);
>             scanner.setRanges(ranges);
>             scanner.fetchColumnFamily(term);
>             for (Map.Entry<Key, Value> entry : scanner) {
>                 guids.add(entry.getKey().getColumnQualifier());
>                 if (guids.size() > limit) {
>                     return null;
>                 }
>             }
>             scanner.close();
>         }
>         return guids;
>     }
>
> Essentially, my query does:
> Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new
> Text("index"), "mytable", 10, 10000);
> Collection<byte[]> data = getRowData(rows, "mytable", 10);
>
>
> -----Original Message-----
> From: Josh Elser [mailto:josh.elser@gmail.com]
> Sent: Tuesday, May 20, 2014 1:32 PM
> To: user@accumulo.apache.org
> Subject: Re: Improving Batchscanner Performance
>
> Hi David,
>
> Absolutely. What you have here is a classic producer-consumer model.
> Your BatchScanner is producing results, which you then consume by your
> scanner, and ultimately return those results to the client.
>
> The problem with your below implementation is that you're not going to be
> polling your batchscanner as aggressively as you could be. You are blocking
> while you can fetch each of those new Ranges from the Scanner before
> fetching new ranges. Have you considered splitting up the BatchScanner and
> Scanner code into two different threads?
>
> You could easily use a ArrayBlockingQueue (or similar) to pass results
> from the BatchScanner to the Scanner. I would imagine that this would give
> you a fair improvement in performance.
>
> Also, it doesn't appear that there's a reason you can't use a BatchScanner
> for both lookups?
>
> One final warning, your current implementation could also hog heap very
> badly if your batchscanner returns too many records. The producer/consumer
> I proposed should help here a little bit, but you should still be asserting
> upper-bounds to avoid running out of heap space in your client.
>
> On 5/20/14, 1:10 PM, Slater, David M. wrote:
> > Hey everyone,
> >
> > I'm trying to improve the query performance of batchscans on my data
> table. I first scan over index tables, which returns a set of rowIDs that
> correspond to the records I am interested in. This set of records is fairly
> randomly (and uniformly) distributed across a large number of tablets, due
> to the randomness of the UID and the query itself. Then I want to scan over
> my data table, which is setup as follows:
> > row                   colFam          colQual         value
> > rowUID         --                     --                      byte[] of
> data
> >
> > These records are fairly small (100s of bytes), but numerous (I may
> return 50000 or more). The method I use to obtain this follows.
> Essentially, I turn the rows returned from the first query into a set of
> ranges to input into the batchscanner, and then return those rows,
> retrieving the value from them.
> >
> > // returns the data associated with the given collection of rows
> >      public Collection<byte[]> getRowData(Collection<Text> rows, Text
> dataType, String tablename, int queryThreads) throws TableNotFoundException
> {
> >          List<byte[]> values = new ArrayList<byte[]>(rows.size());
> >          if (!rows.isEmpty()) {
> >              BatchScanner scanner = conn.createBatchScanner(tablename,
> new Authorizations(), queryThreads);
> >              List<Range> ranges = new ArrayList<Range>();
> >              for (Text row : rows) {
> >                  ranges.add(new Range(row));
> >              }
> >              scanner.setRanges(ranges);
> >              for (Map.Entry<Key, Value> entry : scanner) {
> >                  values.add(entry.getValue().get());
> >              }
> >              scanner.close();
> >          }
> >          return values;
> >      }
> >
> > Is there a more efficient way to do this? I have index caches and bloom
> filters enabled (data caches are not), but I still seem to have a long
> query lag. Any thoughts on how I can improve this?
> >
> > Thanks,
> > David
> >
>

RE: Improving Batchscanner Performance

Posted by "Slater, David M." <Da...@jhuapl.edu>.
Hi Josh,

I should have clarified - I am using a batchscanner for both lookups. I had thought of putting it into two different threads, but the first scan is typically an order of magnitude faster than the second.

The logic for upperbounding the results returned is outside of the method I provided. Since there is a one-to-one relationship between rowIDs and records on the second scan, I just limit the number of rows I send to this method. 

As for blocking, I'm not sure exactly what you mean. I complete the first scan in its entirety, which  before entering this method with the collection of Text rowIDs. The method for that is:

public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String tablename, int queryThreads, int limit) throws TableNotFoundException {
        Set<Text> guids = new HashSet<Text>();
        if (!ranges.isEmpty()) {
            BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
            scanner.setRanges(ranges);
            scanner.fetchColumnFamily(term);
            for (Map.Entry<Key, Value> entry : scanner) {
                guids.add(entry.getKey().getColumnQualifier());
                if (guids.size() > limit) {
                    return null;
                }
            }
            scanner.close();
        }
        return guids;
    }

Essentially, my query does:
Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"), new Text("index"), "mytable", 10, 10000);
Collection<byte[]> data = getRowData(rows, "mytable", 10);


-----Original Message-----
From: Josh Elser [mailto:josh.elser@gmail.com] 
Sent: Tuesday, May 20, 2014 1:32 PM
To: user@accumulo.apache.org
Subject: Re: Improving Batchscanner Performance

Hi David,

Absolutely. What you have here is a classic producer-consumer model. 
Your BatchScanner is producing results, which you then consume by your scanner, and ultimately return those results to the client.

The problem with your below implementation is that you're not going to be polling your batchscanner as aggressively as you could be. You are blocking while you can fetch each of those new Ranges from the Scanner before fetching new ranges. Have you considered splitting up the BatchScanner and Scanner code into two different threads?

You could easily use a ArrayBlockingQueue (or similar) to pass results from the BatchScanner to the Scanner. I would imagine that this would give you a fair improvement in performance.

Also, it doesn't appear that there's a reason you can't use a BatchScanner for both lookups?

One final warning, your current implementation could also hog heap very badly if your batchscanner returns too many records. The producer/consumer I proposed should help here a little bit, but you should still be asserting upper-bounds to avoid running out of heap space in your client.

On 5/20/14, 1:10 PM, Slater, David M. wrote:
> Hey everyone,
>
> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
> row     		colFam      	colQual     	value
> rowUID  	 --          		--          		byte[] of data
>
> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>
> // returns the data associated with the given collection of rows
>      public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>          List<byte[]> values = new ArrayList<byte[]>(rows.size());
>          if (!rows.isEmpty()) {
>              BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>              List<Range> ranges = new ArrayList<Range>();
>              for (Text row : rows) {
>                  ranges.add(new Range(row));
>              }
>              scanner.setRanges(ranges);
>              for (Map.Entry<Key, Value> entry : scanner) {
>                  values.add(entry.getValue().get());
>              }
>              scanner.close();
>          }
>          return values;
>      }
>
> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>
> Thanks,
> David
>

Re: Improving Batchscanner Performance

Posted by Josh Elser <jo...@gmail.com>.
Hi David,

Absolutely. What you have here is a classic producer-consumer model. 
Your BatchScanner is producing results, which you then consume by your 
scanner, and ultimately return those results to the client.

The problem with your below implementation is that you're not going to 
be polling your batchscanner as aggressively as you could be. You are 
blocking while you can fetch each of those new Ranges from the Scanner 
before fetching new ranges. Have you considered splitting up the 
BatchScanner and Scanner code into two different threads?

You could easily use a ArrayBlockingQueue (or similar) to pass results 
from the BatchScanner to the Scanner. I would imagine that this would 
give you a fair improvement in performance.

Also, it doesn't appear that there's a reason you can't use a 
BatchScanner for both lookups?

One final warning, your current implementation could also hog heap very 
badly if your batchscanner returns too many records. The 
producer/consumer I proposed should help here a little bit, but you 
should still be asserting upper-bounds to avoid running out of heap 
space in your client.

On 5/20/14, 1:10 PM, Slater, David M. wrote:
> Hey everyone,
>
> I'm trying to improve the query performance of batchscans on my data table. I first scan over index tables, which returns a set of rowIDs that correspond to the records I am interested in. This set of records is fairly randomly (and uniformly) distributed across a large number of tablets, due to the randomness of the UID and the query itself. Then I want to scan over my data table, which is setup as follows:
> row     		colFam      	colQual     	value
> rowUID  	 --          		--          		byte[] of data
>
> These records are fairly small (100s of bytes), but numerous (I may return 50000 or more). The method I use to obtain this follows. Essentially, I turn the rows returned from the first query into a set of ranges to input into the batchscanner, and then return those rows, retrieving the value from them.
>
> // returns the data associated with the given collection of rows
>      public Collection<byte[]> getRowData(Collection<Text> rows, Text dataType, String tablename, int queryThreads) throws TableNotFoundException {
>          List<byte[]> values = new ArrayList<byte[]>(rows.size());
>          if (!rows.isEmpty()) {
>              BatchScanner scanner = conn.createBatchScanner(tablename, new Authorizations(), queryThreads);
>              List<Range> ranges = new ArrayList<Range>();
>              for (Text row : rows) {
>                  ranges.add(new Range(row));
>              }
>              scanner.setRanges(ranges);
>              for (Map.Entry<Key, Value> entry : scanner) {
>                  values.add(entry.getValue().get());
>              }
>              scanner.close();
>          }
>          return values;
>      }
>
> Is there a more efficient way to do this? I have index caches and bloom filters enabled (data caches are not), but I still seem to have a long query lag. Any thoughts on how I can improve this?
>
> Thanks,
> David
>