You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Joost Ouwerkerk <jo...@openplaces.org> on 2010/05/12 19:32:04 UTC

Load Balancing Mapper Tasks

I've been trying to improve the time it takes to map 30 million rows using a
hadoop / cassandra cluster with 30 nodes.  I discovered that since
CassandraInputFormat returns an ordered list of splits, when there are many
splits (e.g. hundreds or more) the load on cassandra is horribly unbalanced.
 e.g. if I have 30 tasks processing 600 splits, then the first 30 splits are
all located on the same one or two nodes.

I added *Collections.shuffle(splits) *before returning the splits in
getSplits().  As a result, the load is much better distributed, throughput
 was increased (about 3X in my case) and TimedOutExceptions were all but
eliminated.

Joost.

Re: Load Balancing Mapper Tasks

Posted by Jonathan Ellis <jb...@gmail.com>.
That means they are blocking for something to be added to the task queue

On Mon, May 17, 2010 at 9:42 AM, Joost Ouwerkerk <jo...@openplaces.org> wrote:
> At any given moment at least half of those threads are in the following
> state; what does it represent?
> Name: ROW-READ-STAGE:6
> State: WAITING on
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@fea6030
> Total blocked: 44  Total waited: 479
> Stack trace:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> java.lang.Thread.run(Thread.java:619)
>
> On Mon, May 17, 2010 at 12:53 AM, Jonathan Ellis <jb...@gmail.com> wrote:
>>
>> On Sun, May 16, 2010 at 2:52 PM, Joost Ouwerkerk <jo...@openplaces.org>
>> wrote:
>> > Meanwhile. I'm still getting TimedOutException errors when mapping this
>> > 30-million row table, even when retrieving no data at all.  It looks
>> > like it
>> > is related to disk activity on "hot" nodes (when the same cassandra node
>> > has
>> > to handle many concurrent requests for adjacent range slices).  Using
>> > 0.7
>> > trunk branch doesn't appear to alleviate it.  CPU load is at about 25%
>> > when
>> > this happens.  Is there some kind of synchronization that might prevent
>> > the
>> > same file from being scanned by multiple threads?
>>
>> No, but there's only `concurrent_reads` threads serving reads and if
>> you have a lot of ops in the queue ahead of you then that would do it.
>>  (pending tasks in row stage under cassandra.concurrent jmx.)
>>
>> --
>> Jonathan Ellis
>> Project Chair, Apache Cassandra
>> co-founder of Riptano, the source for professional Cassandra support
>> http://riptano.com
>
>



-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of Riptano, the source for professional Cassandra support
http://riptano.com

Re: Load Balancing Mapper Tasks

Posted by Joost Ouwerkerk <jo...@openplaces.org>.
At any given moment at least half of those threads are in the following
state; what does it represent?

Name: ROW-READ-STAGE:6
State: WAITING on
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@fea6030
Total blocked: 44  Total waited: 479

Stack trace:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
java.lang.Thread.run(Thread.java:619)


On Mon, May 17, 2010 at 12:53 AM, Jonathan Ellis <jb...@gmail.com> wrote:

> On Sun, May 16, 2010 at 2:52 PM, Joost Ouwerkerk <jo...@openplaces.org>
> wrote:
> > Meanwhile. I'm still getting TimedOutException errors when mapping this
> > 30-million row table, even when retrieving no data at all.  It looks like
> it
> > is related to disk activity on "hot" nodes (when the same cassandra node
> has
> > to handle many concurrent requests for adjacent range slices).  Using 0.7
> > trunk branch doesn't appear to alleviate it.  CPU load is at about 25%
> when
> > this happens.  Is there some kind of synchronization that might prevent
> the
> > same file from being scanned by multiple threads?
>
> No, but there's only `concurrent_reads` threads serving reads and if
> you have a lot of ops in the queue ahead of you then that would do it.
>  (pending tasks in row stage under cassandra.concurrent jmx.)
>
> --
> Jonathan Ellis
> Project Chair, Apache Cassandra
> co-founder of Riptano, the source for professional Cassandra support
> http://riptano.com
>

Re: Load Balancing Mapper Tasks

Posted by Jonathan Ellis <jb...@gmail.com>.
On Sun, May 16, 2010 at 2:52 PM, Joost Ouwerkerk <jo...@openplaces.org> wrote:
> Meanwhile. I'm still getting TimedOutException errors when mapping this
> 30-million row table, even when retrieving no data at all.  It looks like it
> is related to disk activity on "hot" nodes (when the same cassandra node has
> to handle many concurrent requests for adjacent range slices).  Using 0.7
> trunk branch doesn't appear to alleviate it.  CPU load is at about 25% when
> this happens.  Is there some kind of synchronization that might prevent the
> same file from being scanned by multiple threads?

No, but there's only `concurrent_reads` threads serving reads and if
you have a lot of ops in the queue ahead of you then that would do it.
 (pending tasks in row stage under cassandra.concurrent jmx.)

-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of Riptano, the source for professional Cassandra support
http://riptano.com

Re: Load Balancing Mapper Tasks

Posted by Joost Ouwerkerk <jo...@openplaces.org>.
Hadoop doesn't make any assumptions about how input source data is
distributed.  It can't 'know' that the data for the first 30 splits emitted
by the InputFormat are all stored on the same cassandra node.

The new case with the patch is CASSANDRA-1096

Meanwhile. I'm still getting TimedOutException errors when mapping this
30-million row table, even when retrieving no data at all.  It looks like it
is related to disk activity on "hot" nodes (when the same cassandra node has
to handle many concurrent requests for adjacent range slices).  Using 0.7
trunk branch doesn't appear to alleviate it.  CPU load is at about 25% when
this happens.  Is there some kind of synchronization that might prevent the
same file from being scanned by multiple threads?

joost.

On Sat, May 15, 2010 at 10:55 AM, Jonathan Ellis <jb...@gmail.com> wrote:

> Oh, very interesting.  I assumed Hadoop would be smart enough to
> load-balance the jobs it sends out.  Guess not.
>
> Can you submit a patch?
>
> On Wed, May 12, 2010 at 12:32 PM, Joost Ouwerkerk <jo...@openplaces.org>
> wrote:
> > I've been trying to improve the time it takes to map 30 million rows
> using a
> > hadoop / cassandra cluster with 30 nodes.  I discovered that since
> > CassandraInputFormat returns an ordered list of splits, when there are
> many
> > splits (e.g. hundreds or more) the load on cassandra is horribly
> unbalanced.
> >  e.g. if I have 30 tasks processing 600 splits, then the first 30 splits
> are
> > all located on the same one or two nodes.
> > I added Collections.shuffle(splits) before returning the splits in
> > getSplits().  As a result, the load is much better distributed,
> throughput
> >  was increased (about 3X in my case) and TimedOutExceptions were all but
> > eliminated.
> > Joost.
>
>
>
> --
> Jonathan Ellis
> Project Chair, Apache Cassandra
> co-founder of Riptano, the source for professional Cassandra support
> http://riptano.com
>

Re: Load Balancing Mapper Tasks

Posted by Jonathan Ellis <jb...@gmail.com>.
Oh, very interesting.  I assumed Hadoop would be smart enough to
load-balance the jobs it sends out.  Guess not.

Can you submit a patch?

On Wed, May 12, 2010 at 12:32 PM, Joost Ouwerkerk <jo...@openplaces.org> wrote:
> I've been trying to improve the time it takes to map 30 million rows using a
> hadoop / cassandra cluster with 30 nodes.  I discovered that since
> CassandraInputFormat returns an ordered list of splits, when there are many
> splits (e.g. hundreds or more) the load on cassandra is horribly unbalanced.
>  e.g. if I have 30 tasks processing 600 splits, then the first 30 splits are
> all located on the same one or two nodes.
> I added Collections.shuffle(splits) before returning the splits in
> getSplits().  As a result, the load is much better distributed, throughput
>  was increased (about 3X in my case) and TimedOutExceptions were all but
> eliminated.
> Joost.



-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of Riptano, the source for professional Cassandra support
http://riptano.com