You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@cassandra.apache.org by al...@ceid.upatras.gr on 2011/01/12 12:03:12 UTC

Send reads concurrently

Hello,

I am reading through getRangeSlice() in StorageProxy, and I am trying to
do roughly the same thing for a join operation I am trying to implement in
Cassandra.

I see that getRangeSlice() loops through all available ranges, and for
each range, it sends a request to the applicable nodes and then handles
their answer *before* advancing to the next range. That may be fine for
getRangeSlice(), but I'd like to send requests for all ranges at once and
collect and handle the responses asynchronoysly (when they arrive).

I tried the following (pseudocode mixed with code):

<create a list of handlers>

for (AbstractBounds range : ranges)
{
    <create handler for this range>

    for (InetAddress endpoint : entry.getValue())
    {
        MessagingService.instance.sendRR(message, endpoint, handler);
    }

    handlers.add(handler);
}

for(QRH handler : handlers)
{
    List<ByteBuffer> response = handler.get();
}

However my client gets a TimedOutException and I think Cassandra blocks
during the resolve() in get().

I am using 0.7.0rc1, however I don't think this code would have changed much.

Any ideas?

Alexander

Re: Send reads concurrently

Posted by al...@ceid.upatras.gr.
I found the solution to this problem. I was sending the same message to
all nodes, with the same ID, and that was apparently causing problems
somehow. It is solved anyway.

Alexander

> Thank you for your answer, however I am pretty sure that's not it. I have
> a small-two node cluster for development testing, and I have loaded it
> with data in a way that responses to my queries usually have about 50000
> short rows (which I think is not very much).
>
> First of all, if I do it like this, it works:
>
> for (AbstractBounds range : ranges)
> {
>     <create handler for this range>
>
>     for (InetAddress endpoint : entry.getValue())
>     {
>         MessagingService.instance.sendRR(message, endpoint, handler);
>     }
>
>     List<ByteBuffer> response = handler.get();
> }
>
> So it's not a matter or rpc_timeout. However, just in case, I increased
> rpc_timeout to 40000 ms (which is longer than any of my queries take with
> the above method), and the problem still occurred.
>
> My problem is that I don't want to wait for all the results for one range
> before sending the request to another one.
>
> I also added some debugging to QuorumResponseHandler.get() and I found out
> where exactly the problem occurs. This (QRH, line 64 in rc1):
>
> success = condition.await(timeout, TimeUnit.MILLISECONDS);
>
> is never true with my way.
>
> The puzzling thing is that I noticed strongRead does exactly the same
> thing that I want to do, but it obviously works, and I can't see what I am
> doing differently. One thing to note, maybe, is that I even send a request
> to the same node that sends the requests, and I am wondering if there is
> some kind of deadlock between the different threads that handle the verb
> and the requests.
>
> Alexander
>
>> I would guess that sending a bunch of range requests simultaneously
>> overwhelms the targets (range scans are expensive), so you're timing
>> out simply because it couldn't finish all of them within rpc_timeout.
>>
>> Solution: don't do that, or increase rpc_timeout.
>>
>> On Wed, Jan 12, 2011 at 3:03 AM,  <al...@ceid.upatras.gr> wrote:
>>> Hello,
>>>
>>> I am reading through getRangeSlice() in StorageProxy, and I am trying
>>> to
>>> do roughly the same thing for a join operation I am trying to implement
>>> in
>>> Cassandra.
>>>
>>> I see that getRangeSlice() loops through all available ranges, and for
>>> each range, it sends a request to the applicable nodes and then handles
>>> their answer *before* advancing to the next range. That may be fine for
>>> getRangeSlice(), but I'd like to send requests for all ranges at once
>>> and
>>> collect and handle the responses asynchronoysly (when they arrive).
>>>
>>> I tried the following (pseudocode mixed with code):
>>>
>>> <create a list of handlers>
>>>
>>> for (AbstractBounds range : ranges)
>>> {
>>>    <create handler for this range>
>>>
>>>    for (InetAddress endpoint : entry.getValue())
>>>    {
>>>        MessagingService.instance.sendRR(message, endpoint, handler);
>>>    }
>>>
>>>    handlers.add(handler);
>>> }
>>>
>>> for(QRH handler : handlers)
>>> {
>>>    List<ByteBuffer> response = handler.get();
>>> }
>>>
>>> However my client gets a TimedOutException and I think Cassandra blocks
>>> during the resolve() in get().
>>>
>>> I am using 0.7.0rc1, however I don't think this code would have changed
>>> much.
>>>
>>> Any ideas?
>>>
>>> Alexander
>>>
>>
>>
>>
>> --
>> Jonathan Ellis
>> Project Chair, Apache Cassandra
>> co-founder of Riptano, the source for professional Cassandra support
>> http://riptano.com
>>
>>
>
>


Re: Send reads concurrently

Posted by al...@ceid.upatras.gr.
Thank you for your answer, however I am pretty sure that's not it. I have
a small-two node cluster for development testing, and I have loaded it
with data in a way that responses to my queries usually have about 50000
short rows (which I think is not very much).

First of all, if I do it like this, it works:

for (AbstractBounds range : ranges)
{
    <create handler for this range>

    for (InetAddress endpoint : entry.getValue())
    {
        MessagingService.instance.sendRR(message, endpoint, handler);
    }

    List<ByteBuffer> response = handler.get();
}

So it's not a matter or rpc_timeout. However, just in case, I increased
rpc_timeout to 40000 ms (which is longer than any of my queries take with
the above method), and the problem still occurred.

My problem is that I don't want to wait for all the results for one range
before sending the request to another one.

I also added some debugging to QuorumResponseHandler.get() and I found out
where exactly the problem occurs. This (QRH, line 64 in rc1):

success = condition.await(timeout, TimeUnit.MILLISECONDS);

is never true with my way.

The puzzling thing is that I noticed strongRead does exactly the same
thing that I want to do, but it obviously works, and I can't see what I am
doing differently. One thing to note, maybe, is that I even send a request
to the same node that sends the requests, and I am wondering if there is
some kind of deadlock between the different threads that handle the verb
and the requests.

Alexander

> I would guess that sending a bunch of range requests simultaneously
> overwhelms the targets (range scans are expensive), so you're timing
> out simply because it couldn't finish all of them within rpc_timeout.
>
> Solution: don't do that, or increase rpc_timeout.
>
> On Wed, Jan 12, 2011 at 3:03 AM,  <al...@ceid.upatras.gr> wrote:
>> Hello,
>>
>> I am reading through getRangeSlice() in StorageProxy, and I am trying to
>> do roughly the same thing for a join operation I am trying to implement
>> in
>> Cassandra.
>>
>> I see that getRangeSlice() loops through all available ranges, and for
>> each range, it sends a request to the applicable nodes and then handles
>> their answer *before* advancing to the next range. That may be fine for
>> getRangeSlice(), but I'd like to send requests for all ranges at once
>> and
>> collect and handle the responses asynchronoysly (when they arrive).
>>
>> I tried the following (pseudocode mixed with code):
>>
>> <create a list of handlers>
>>
>> for (AbstractBounds range : ranges)
>> {
>>    <create handler for this range>
>>
>>    for (InetAddress endpoint : entry.getValue())
>>    {
>>        MessagingService.instance.sendRR(message, endpoint, handler);
>>    }
>>
>>    handlers.add(handler);
>> }
>>
>> for(QRH handler : handlers)
>> {
>>    List<ByteBuffer> response = handler.get();
>> }
>>
>> However my client gets a TimedOutException and I think Cassandra blocks
>> during the resolve() in get().
>>
>> I am using 0.7.0rc1, however I don't think this code would have changed
>> much.
>>
>> Any ideas?
>>
>> Alexander
>>
>
>
>
> --
> Jonathan Ellis
> Project Chair, Apache Cassandra
> co-founder of Riptano, the source for professional Cassandra support
> http://riptano.com
>
>


Re: Send reads concurrently

Posted by Jonathan Ellis <jb...@gmail.com>.
I would guess that sending a bunch of range requests simultaneously
overwhelms the targets (range scans are expensive), so you're timing
out simply because it couldn't finish all of them within rpc_timeout.

Solution: don't do that, or increase rpc_timeout.

On Wed, Jan 12, 2011 at 3:03 AM,  <al...@ceid.upatras.gr> wrote:
> Hello,
>
> I am reading through getRangeSlice() in StorageProxy, and I am trying to
> do roughly the same thing for a join operation I am trying to implement in
> Cassandra.
>
> I see that getRangeSlice() loops through all available ranges, and for
> each range, it sends a request to the applicable nodes and then handles
> their answer *before* advancing to the next range. That may be fine for
> getRangeSlice(), but I'd like to send requests for all ranges at once and
> collect and handle the responses asynchronoysly (when they arrive).
>
> I tried the following (pseudocode mixed with code):
>
> <create a list of handlers>
>
> for (AbstractBounds range : ranges)
> {
>    <create handler for this range>
>
>    for (InetAddress endpoint : entry.getValue())
>    {
>        MessagingService.instance.sendRR(message, endpoint, handler);
>    }
>
>    handlers.add(handler);
> }
>
> for(QRH handler : handlers)
> {
>    List<ByteBuffer> response = handler.get();
> }
>
> However my client gets a TimedOutException and I think Cassandra blocks
> during the resolve() in get().
>
> I am using 0.7.0rc1, however I don't think this code would have changed much.
>
> Any ideas?
>
> Alexander
>



-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of Riptano, the source for professional Cassandra support
http://riptano.com