You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by "Brent N. Chun" <bn...@nutanix.com> on 2010/07/08 09:21:44 UTC

Reading all rows in a column family in parallel

Hello,

I'm running Cassandra 0.6.0 on a cluster and have an application that 
needs to read all rows from a column family using the Cassandra Thrift 
API. Ideally, I'd like to be able to do this by having all nodes in the 
cluster read in parallel (i.e., each node reads a disjoint set of rows 
that are stored locally). I should also mention that I'm using the 
RandomPartitioner.

Here's what I was thinking:

   1. Have one node invoke describe_ring to find the token range on the 
ring that each node is responsible for.

   2. For each token range, have the node that owns that portion of the 
ring read the rows in that range using a sequence of get_range_slices 
calls (using start/end tokens, not keys).

This type of functionality seems to already be there in the tree with 
the recent Cassandra/Hadoop integration.

...
KeyRange keyRange = new KeyRange(batchRowCount)
         .setStart_token(startToken)
         .setEnd_token(split.getEndToken());
try
{
     rows = client.get_range_slices(new ColumnParent(cfName),
            predicate,
            keyRange,
            ConsistencyLevel.ONE);
      ...

     // prepare for the next slice to be read
     KeySlice lastRow = rows.get(rows.size() - 1);
     IPartitioner p = DatabaseDescriptor.getPartitioner();
     byte[] rowkey = lastRow.getKey();
     startToken = p.getTokenFactory().toString(p.getToken(rowkey));
...

The above snippet from ColumnFamilyRecordReader.java seems to suggest it 
is possible to scan an entire column family by reading disjoint sets of 
rows using token-based range queries (as opposed to key-based range 
queries). Is this possible in 0.6.0? (Note: for the next startToken, I 
was just planning on computing the MD5 digest of the last key directly 
since I'm accessing Cassandra through Thrift.)

Thoughts?

bnc

Re: Reading all rows in a column family in parallel

Posted by "Brent N. Chun" <bn...@nutanix.com>.
Jonathan Ellis wrote:
> There have been a number of bug fixes to this since 0.6.0 -- as Thomas
> said, it works in 0.6.3.  (Although there is one related bug scheduled
> to be fixed in 0.6.4,
> https://issues.apache.org/jira/browse/CASSANDRA-1042)

Ah, this is exactly one of the cases I've been seeing! Thanks, Jonathan.

bnc

Re: Reading all rows in a column family in parallel

Posted by Jonathan Ellis <jb...@gmail.com>.
There have been a number of bug fixes to this since 0.6.0 -- as Thomas
said, it works in 0.6.3.  (Although there is one related bug scheduled
to be fixed in 0.6.4,
https://issues.apache.org/jira/browse/CASSANDRA-1042)

On Thu, Jul 8, 2010 at 2:06 PM, Brent N. Chun <bn...@nutanix.com> wrote:
> Hi Jonathan,
>
> The code snippet below was from the repository. I mentioned 0.6.0
> specifically just to confirm that reading a CF using token-based range
> queries with the RandomPartitioner should (or shouldn't) also work in that
> version. I've seen discussions about whether range queries are now supported
> with the RandomPartitioner for example. Moreover, those discussions mostly
> seem to involve key-based range queries, though, not token-based range
> queries like CFRR uses. If you're saying that this functionality essentially
> works for everyone but me in 0.6.0, then that implies I have a bug in my
> code which would be great news for me. What I'm essentially seeing is either
> all rows, all rows + duplicate rows, or missing rows even when using a
> single node. Which of these I get is entirely deterministic. If I delete all
> the data and insert the same rows, the ranges returned by describe_ring
> changes but the end result of reading the CF is then one of those three
> cases.
>
> Thanks,
> bnc
>
> Jonathan Ellis wrote:
>>
>> "CFRR does this.  Is this possible?"
>>
>> I guess I don't understand the question. :)
>



-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of Riptano, the source for professional Cassandra support
http://riptano.com

Re: Reading all rows in a column family in parallel

Posted by "Brent N. Chun" <bn...@nutanix.com>.
Hi Jonathan,

The code snippet below was from the repository. I mentioned 0.6.0 
specifically just to confirm that reading a CF using token-based range 
queries with the RandomPartitioner should (or shouldn't) also work in 
that version. I've seen discussions about whether range queries are now 
supported with the RandomPartitioner for example. Moreover, those 
discussions mostly seem to involve key-based range queries, though, not 
token-based range queries like CFRR uses. If you're saying that this 
functionality essentially works for everyone but me in 0.6.0, then that 
implies I have a bug in my code which would be great news for me. What 
I'm essentially seeing is either all rows, all rows + duplicate rows, or 
missing rows even when using a single node. Which of these I get is 
entirely deterministic. If I delete all the data and insert the same 
rows, the ranges returned by describe_ring changes but the end result of 
reading the CF is then one of those three cases.

Thanks,
bnc

Jonathan Ellis wrote:
> "CFRR does this.  Is this possible?"
> 
> I guess I don't understand the question. :)

Re: Reading all rows in a column family in parallel

Posted by Jonathan Ellis <jb...@gmail.com>.
"CFRR does this.  Is this possible?"

I guess I don't understand the question. :)

On Thu, Jul 8, 2010 at 2:21 AM, Brent N. Chun <bn...@nutanix.com> wrote:
> Hello,
>
> I'm running Cassandra 0.6.0 on a cluster and have an application that needs
> to read all rows from a column family using the Cassandra Thrift API.
> Ideally, I'd like to be able to do this by having all nodes in the cluster
> read in parallel (i.e., each node reads a disjoint set of rows that are
> stored locally). I should also mention that I'm using the RandomPartitioner.
>
> Here's what I was thinking:
>
>  1. Have one node invoke describe_ring to find the token range on the ring
> that each node is responsible for.
>
>  2. For each token range, have the node that owns that portion of the ring
> read the rows in that range using a sequence of get_range_slices calls
> (using start/end tokens, not keys).
>
> This type of functionality seems to already be there in the tree with the
> recent Cassandra/Hadoop integration.
>
> ...
> KeyRange keyRange = new KeyRange(batchRowCount)
>        .setStart_token(startToken)
>        .setEnd_token(split.getEndToken());
> try
> {
>    rows = client.get_range_slices(new ColumnParent(cfName),
>           predicate,
>           keyRange,
>           ConsistencyLevel.ONE);
>     ...
>
>    // prepare for the next slice to be read
>    KeySlice lastRow = rows.get(rows.size() - 1);
>    IPartitioner p = DatabaseDescriptor.getPartitioner();
>    byte[] rowkey = lastRow.getKey();
>    startToken = p.getTokenFactory().toString(p.getToken(rowkey));
> ...
>
> The above snippet from ColumnFamilyRecordReader.java seems to suggest it is
> possible to scan an entire column family by reading disjoint sets of rows
> using token-based range queries (as opposed to key-based range queries). Is
> this possible in 0.6.0? (Note: for the next startToken, I was just planning
> on computing the MD5 digest of the last key directly since I'm accessing
> Cassandra through Thrift.)
>
> Thoughts?
>
> bnc
>



-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of Riptano, the source for professional Cassandra support
http://riptano.com

Re: Reading all rows in a column family in parallel

Posted by "Brent N. Chun" <bn...@nutanix.com>.
Thomas Heller wrote:
> Hey,
> 
>> .... Is
>> this possible in 0.6.0? (Note: for the next startToken, I was just planning
>> on computing the MD5 digest of the last key directly since I'm accessing
>> Cassandra through Thrift.)
> 
> Can't speak for 0.6.0 but it works for 0.6.3.
> 
> Just implemented this in ruby (minus the parallel part).
> 
> Cheers,
> /thomas

Hm, I must be doing something fundamentally wrong then. I just tried 0.6.3, same 
result. In this example, I have a 1 node system and have 100 rows in a single 
CF. When trying to read it back using token-based range queries and a 
RandomPartitioner, I get the following below (only 33/100 rows returned).

Now the 100 rows have keys that hash to random points on the ring. In the 
example below, I'm reading rows in chunks of 20.

In the first range query, the initial range is the entire ring. The 20 rows 
returned have MD5 hashes in no particular order it seems and could be anywhere 
on the ring. Taking the MD5 hash of the last row's key, I start the second range 
query.

In the second range query ( 292996472659622939455744264432842142924, 
34571752641348786448680284622901156834 ], what's being returned below seems like 
exactly what it suggests: return rows in the above range of MD5 hashes. But some 
of the remaining 80 rows we want may be outside that range. Hence, only 33 rows 
below.

If the rows were being returned in the token-based range queries were in in MD5 
hash order (and handled wraps ideally), then it seems like this interface could 
work. But others seem to be using this functionality successfully, so that 
suggests this is somehow unnecessary. Can someone help me out here?

Thanks,
bnc

--------------------------------------------------------------------------------

Scanning range 0 ( 34571752641348786448680284622901156834, 
34571752641348786448680284622901156834 ]
Scanning chunk ( 34571752641348786448680284622901156834, 
34571752641348786448680284622901156834 ] in range 0
Read 20 rows
Read row 0, token 336932469034906281211924193433194809371, key 0_my_key62
Read row 1, token 5919946189209861803345840641668714978, key G_my_key16
Read row 2, token 6676056754427192599913432294390467082, key N_my_key85
Read row 3, token 330974738873996707017206868970060026330, key 6_my_key6
Read row 4, token 9595097897929687061907189837471352784, key E_my_key14
Read row 5, token 16575788966172751729835323651471549632, key a_my_key98
Read row 6, token 20927090112620661198733690835293074593, key 5_my_key67
Read row 7, token 28411545431179372696834683157677733478, key B_my_key73
Read row 8, token 29636277939148773659952116897998650776, key Q_my_key26
Read row 9, token 31186550159320208451777665196866508345, key j_my_key45
Read row 10, token 309081729348188654502493750295907191249, key D_my_key75
Read row 11, token 308480936859450293438865473928962136114, key W_my_key32
Read row 12, token 33060929359846763792204741553927689627, key Q_my_key88
Read row 13, token 36834373239213294576855495985365240744, key D_my_key13
Read row 14, token 302818545694924710056493830778421143168, key C_my_key12
Read row 15, token 39723252966237722984897584840501933181, key I_my_key18
Read row 16, token 297899763604776667052026292305780186395, key 2_my_key2
Read row 17, token 45994786947573748381278100108617428931, key U_my_key92
Read row 18, token 294076607175826631726358986726954934589, key T_my_key29
Read row 19, token 292996472659622939455744264432842142924, key M_my_key84
Scanning chunk ( 292996472659622939455744264432842142924, 
34571752641348786448680284622901156834 ] in range 0
Read 13 rows
Read row 20, token 336932469034906281211924193433194809371, key 0_my_key62
Read row 21, token 5919946189209861803345840641668714978, key G_my_key16
Read row 22, token 6676056754427192599913432294390467082, key N_my_key85
Read row 23, token 330974738873996707017206868970060026330, key 6_my_key6
Read row 24, token 9595097897929687061907189837471352784, key E_my_key14
Read row 25, token 16575788966172751729835323651471549632, key a_my_key98
Read row 26, token 20927090112620661198733690835293074593, key 5_my_key67
Read row 27, token 28411545431179372696834683157677733478, key B_my_key73
Read row 28, token 29636277939148773659952116897998650776, key Q_my_key26
Read row 29, token 31186550159320208451777665196866508345, key j_my_key45
Read row 30, token 309081729348188654502493750295907191249, key D_my_key75
Read row 31, token 308480936859450293438865473928962136114, key W_my_key32
Read row 32, token 33060929359846763792204741553927689627, key Q_my_key88
Scanning chunk ( 33060929359846763792204741553927689627, 
34571752641348786448680284622901156834 ] in range 0
Read 0 rows

--------------------------------------------------------------------------------

Re: Reading all rows in a column family in parallel

Posted by Thomas Heller <in...@zilence.net>.
Hey,

> .... Is
> this possible in 0.6.0? (Note: for the next startToken, I was just planning
> on computing the MD5 digest of the last key directly since I'm accessing
> Cassandra through Thrift.)

Can't speak for 0.6.0 but it works for 0.6.3.

Just implemented this in ruby (minus the parallel part).

Cheers,
/thomas