You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Subscriber <su...@zfabrik.de> on 2011/04/29 18:45:08 UTC

Experiences with Map&Reduce Stress Tests

Hi all, 

We want to share our experiences we got during our Cassandra plus Hadoop Map/Reduce evaluation.
Our question was whether Cassandra is suitable for massive distributed data writes using Hadoop's Map/Reduce feature.

Our setup is described in the attached file 'cassandra_stress_setup.txt'.



The stress test uses 800 map-tasks to generate data and store it into cassandra.
Each map task writes 500.000 items (i.e. rows) resulting in totally 400.000.000 items. 
There are max. 8 map tasks in parallel on each node. An item contains (beside the key) two long and two double values, 
so that items are a few 100 bytes in size. This leads to a total data size of approximately 120GB.

The Map-Tasks uses the Hector API. Hector is "feeded" with all three data nodes. The data is written in chunks of 1000 items.
The ConsitencyLevel is set to ONE.

We ran the stress tests in several runs with different configuration settings (for example I started with cassandra's default configuration and I used Pelops for another test).

Our observations are like this:

1) Cassandra is really fast - we are really impressed about the huge write throughput. A map task writing 500.000 items (appr. 200MB) usually finishes under 5 minutes.
2) However - unfortunately all tests failed in the end

In the beginning there are no problems. The first 100 (in some tests the first 300(!)) map tasks are looking fine. But then the trouble starts.

Hadoop's sample output after ~15 minutes:

Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed Task Attempts
map	14.99%		800		680	24	96		0	0 / 0
reduce	3.99%		1		0	1	0		0	0 / 0

Some stats:
>top
  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                                                                                                                                              
31159 xxxx      20   0 2569m 2.2g 9.8m S  450 18.6  61:44.73 java                                                                                                                                                                                                   

>vmstat 1 5
procs -----------memory---------- ---swap-- -----io---- -system-- ----cpu----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa
 2  1  36832 353688 242820 6837520    0    0    15    73    3    2  3  0 96  0
11  1  36832 350992 242856 6852136    0    0  1024 20900 4508 11738 19  1 74  6
 8  0  36832 339728 242876 6859828    0    0     0  1068 45809 107008 69 10 20  0
 1  0  36832 330212 242884 6868520    0    0     0    80 42112 92930 71  8 21  0
 2  0  36832 311888 242908 6887708    0    0  1024     0 20277 46669 46  7 47  0

>cassandra/bin/nodetool -h tirdata1 -p 28080 ring
Address         Status State   Load            Owns    Token                                       
                                                       113427455640312821154458202477256070484     
192.168.11.198  Up     Normal  6.72 GB         33.33%  0                                           
192.168.11.199  Up     Normal  6.72 GB         33.33%  56713727820156410577229101238628035242      
192.168.11.202  Up     Normal  6.68 GB         33.33%  113427455640312821154458202477256070484     


Hadoop's sample output after ~20 minutes:

Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed Task Attempts
map	15.49%		800		673	24	103		0	6 / 0
reduce	4.16%		1		0	1	0		0	0 / 0

What went wrong? It's always the same. The clients cannot reach the nodes anymore. 

java.lang.RuntimeException: work failed
	at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:109)
	at com.zfabrik.hadoop.impl.DelegatingMapper.run(DelegatingMapper.java:40)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:625)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
	at org.apache.hadoop.mapred.Child.main(Child.java:170)
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
	at java.lang.reflect.Method.invoke(Method.java:597)
	at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:107)
	... 4 more
Caused by: java.lang.RuntimeException: me.prettyprint.hector.api.exceptions.HUnavailableException: : May not be enough replicas present to handle consistency level.
	at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:47)
	at com.zfabrik.work.WorkUnit.work(WorkUnit.java:342)
	at com.zfabrik.impl.launch.ProcessRunnerImpl.work(ProcessRunnerImpl.java:189)
	... 9 more
Caused by: me.prettyprint.hector.api.exceptions.HUnavailableException: : May not be enough replicas present to handle consistency level.
	at me.prettyprint.cassandra.service.ExceptionsTranslatorImpl.translate(ExceptionsTranslatorImpl.java:52)
	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:95)
	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:88)
	at me.prettyprint.cassandra.service.Operation.executeAndSetResult(Operation.java:101)
	at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:221)
	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.operateWithFailover(KeyspaceServiceImpl.java:129)
	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:100)
	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:106)
	at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:203)
	at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:200)
	at me.prettyprint.cassandra.model.KeyspaceOperationCallback.doInKeyspaceAndMeasure(KeyspaceOperationCallback.java:20)
	at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecute(ExecutingKeyspace.java:85)
	at me.prettyprint.cassandra.model.MutatorImpl.execute(MutatorImpl.java:200)
	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper._flush(HectorBasedMassItemGenMapper.java:122)
	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:103)
	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:1)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
	at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:45)
	... 11 more
Caused by: UnavailableException()
	at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:16485)
	at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:916)
	at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:890)
	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:93)
	... 27 more

-------
Task attempt_201104291345_0001_m_000028_0 failed to report status for 602 seconds. Killing!

I also observed also that when connecting to cassandra-cli during the stress test, it was not possible to list the items written so far:

[default@unknown] use ItemRepo;
Authenticated to keyspace: ItemRepo
[default@ItemRepo] list Items;
Using default limit of 100
Internal error processing get_range_slices


It seems to me that from the point I performed the read operation in the cli tool, the node becomes somehow confused.
Looking on the jconsole shows that up to this point the heap is well: it grows and gcs clears it again. 
But from this point on, gcs doesn't really help anymore (see attached screenshot).





This has also impact on the other nodes as one can see in the second screenshot. The CPU Usage goes down as well as the heap memory usage.  


I'll run another stress test at the weekend with

 * MAX_HEAP_SIZE="4G"
 * HEAP_NEWSIZE="400M"

Best Regards
Udo


Re: Experiences with Map&Reduce Stress Tests

Posted by Jeremy Hanna <je...@gmail.com>.
Writing to Cassandra from map/reduce jobs over HDFS shouldn't be a problem.  We're doing it in our cluster and I know of others doing the same thing.  You might just make sure the number of reducers (or mappers) writing to cassandra don't overwhelm it.  There's no data locality for writes, though a cassandra specific partitioner might help with that in the future.  See CASSANDRA-1473 - https://issues.apache.org/jira/browse/CASSANDRA-1473.

I apologize that I misspoke about one of the settings.  The batch size is in fact the number of rows it gets each time.  The input splits just affects how many mappers it splits the data into.

As far as recommending this solution, it really depends on the problem.  The people I know doing what you're thinking of doing typically store raw data in HDFS, perform mapreduce jobs over that data and output the results into Cassandra for realtime queries.

We're using it where I work for storage and analytics both.  We store raw data into S3/HDFS, mapreduce over that data and output into cassandra, then perform realtime queries as well as analytics over that data.  If you want to do run analytics over Cassandra data, you'll want to partition your cluster so that mapreduce jobs don't affect the realtime performance.

On May 3, 2011, at 3:19 AM, Subscriber wrote:

> Hi Jeremy, 
> 
> yes, the setup on the data-nodes is:
> 	- Hadoop DataNode
> 	- Hadoop TaskTracker
> 	- CassandraDaemon
> 
> However - the map-input is not read from Cassandra. I am running a writing stress test - no reads (well from time to time I check the produced items using cassandra-cli).
> Is it possible to achieve data-locality on writes? Well I think that this is (in practice) not possible (one could create some artificial data that correlates with the hashed row-key values or so ... ;-)
> 
> Thanks for all your tips and hints! It's good see that someone worries about my problems :-)
> But - to be honest - my number one priority is not to get this test running but to answer the question whether the setup Cassandra+Hadoop with massive parallel writes (using map/reduce) meets the demands of our customer.
> 
> I found out that the following configuration helps a lot. 
> * disk_access_mode: standard 
> * MAX_HEAP_SIZE="4G"
> * HEAP_NEWSIZE="400M"
> * rpc_timeout_in_ms: 20000
> 
> Now the stress test runs through, but there are still timeouts (Hadoop reschedules the failing mapper tasks on another node and so the test runs through).
> But what causes this timeouts? 20 seconds are a long time for a modern cpu (and an eternity for an android ;-) 
> 
> It seems to me that it's not only the massive amount of data or to many parallel mappers, because Cassandra can handle this huge write rate over one hour! 
> I found in the system.logs that the ConcurrentMarkSweeps take quite long (up to 8 seconds). The heap size didn't grow much about 3GB so there was still "enough air to breath".
> 
> So the question remains: can I recommend this setup?
> 
> Thanks again and best regards
> Udo
> 
> 
> Am 02.05.2011 um 20:21 schrieb Jeremy Hanna:
> 
>> Udo,
>> 
>> One thing to get out of the way - you're running task trackers on all of your cassandra nodes, right?  That is the first and foremost way to get good performance.  Otherwise you don't have data locality, which is really the point of map/reduce, co-locating your data and your processes operating over that data.  You're probably already doing that, but I had forgotten to ask that before.
>> 
>> Besides that...
>> 
>> You might try messing with those values a bit more as well as the input split size - cassandra.input.split.size which defaults to ~65k.  So you might try rpc timeout of 30s just to see if that helps and try reducing the input split size significantly to see if that helps.
>> 
>> For your setup I don't see the range batch size as being meaningful at all with your narrow rows, so don't worry about that.
>> 
>> Also, the capacity of your nodes and the number of mappers/reducers you're trying to use will also have an effect on whether it has to timeout.  Essentially it's getting overwhelmed for some reason.  You might lower the number of mappers and reducers you're hitting your cassandra cluster with to see if that helps.
>> 
>> Jeremy
>> 
>> On May 2, 2011, at 6:25 AM, Subscriber wrote:
>> 
>>> Hi Jeremy, 
>>> 
>>> thanks for the link.
>>> I doubled the rpc_timeout (20 seconds) and reduced the range-batch-size to 2048, but I still get timeouts...
>>> 
>>> Udo
>>> 
>>> Am 29.04.2011 um 18:53 schrieb Jeremy Hanna:
>>> 
>>>> It sounds like there might be some tuning you can do to your jobs - take a look at the wiki's HadoopSupport page, specifically the Troubleshooting section:
>>>> http://wiki.apache.org/cassandra/HadoopSupport#Troubleshooting
>>>> 
>>>> On Apr 29, 2011, at 11:45 AM, Subscriber wrote:
>>>> 
>>>>> Hi all, 
>>>>> 
>>>>> We want to share our experiences we got during our Cassandra plus Hadoop Map/Reduce evaluation.
>>>>> Our question was whether Cassandra is suitable for massive distributed data writes using Hadoop's Map/Reduce feature.
>>>>> 
>>>>> Our setup is described in the attached file 'cassandra_stress_setup.txt'.
>>>>> 
>>>>> <cassandra_stress_setup.txt>
>>>>> 
>>>>> The stress test uses 800 map-tasks to generate data and store it into cassandra.
>>>>> Each map task writes 500.000 items (i.e. rows) resulting in totally 400.000.000 items. 
>>>>> There are max. 8 map tasks in parallel on each node. An item contains (beside the key) two long and two double values, 
>>>>> so that items are a few 100 bytes in size. This leads to a total data size of approximately 120GB.
>>>>> 
>>>>> The Map-Tasks uses the Hector API. Hector is "feeded" with all three data nodes. The data is written in chunks of 1000 items.
>>>>> The ConsitencyLevel is set to ONE.
>>>>> 
>>>>> We ran the stress tests in several runs with different configuration settings (for example I started with cassandra's default configuration and I used Pelops for another test).
>>>>> 
>>>>> Our observations are like this:
>>>>> 
>>>>> 1) Cassandra is really fast - we are really impressed about the huge write throughput. A map task writing 500.000 items (appr. 200MB) usually finishes under 5 minutes.
>>>>> 2) However - unfortunately all tests failed in the end
>>>>> 
>>>>> In the beginning there are no problems. The first 100 (in some tests the first 300(!)) map tasks are looking fine. But then the trouble starts.
>>>>> 
>>>>> Hadoop's sample output after ~15 minutes:
>>>>> 
>>>>> Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed Task Attempts
>>>>> map	14.99%		800		680	24	96		0	0 / 0
>>>>> reduce	3.99%		1		0	1	0		0	0 / 0
>>>>> 
>>>>> Some stats:
>>>>>> top
>>>>> PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                                                                                                                                              
>>>>> 31159 xxxx      20   0 2569m 2.2g 9.8m S  450 18.6  61:44.73 java                                                                                                                                                                                                   
>>>>> 
>>>>>> vmstat 1 5
>>>>> procs -----------memory---------- ---swap-- -----io---- -system-- ----cpu----
>>>>> r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa
>>>>> 2  1  36832 353688 242820 6837520    0    0    15    73    3    2  3  0 96  0
>>>>> 11  1  36832 350992 242856 6852136    0    0  1024 20900 4508 11738 19  1 74  6
>>>>> 8  0  36832 339728 242876 6859828    0    0     0  1068 45809 107008 69 10 20  0
>>>>> 1  0  36832 330212 242884 6868520    0    0     0    80 42112 92930 71  8 21  0
>>>>> 2  0  36832 311888 242908 6887708    0    0  1024     0 20277 46669 46  7 47  0
>>>>> 
>>>>>> cassandra/bin/nodetool -h tirdata1 -p 28080 ring
>>>>> Address         Status State   Load            Owns    Token                                      
>>>>>                                                    113427455640312821154458202477256070484     
>>>>> 192.168.11.198  Up     Normal  6.72 GB         33.33%  0                                          
>>>>> 192.168.11.199  Up     Normal  6.72 GB         33.33%  56713727820156410577229101238628035242      
>>>>> 192.168.11.202  Up     Normal  6.68 GB         33.33%  113427455640312821154458202477256070484     
>>>>> 
>>>>> 
>>>>> Hadoop's sample output after ~20 minutes:
>>>>> 
>>>>> Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed Task Attempts
>>>>> map	15.49%		800		673	24	103		0	6 / 0
>>>>> reduce	4.16%		1		0	1	0		0	0 / 0
>>>>> 
>>>>> What went wrong? It's always the same. The clients cannot reach the nodes anymore. 
>>>>> 
>>>>> java.lang.RuntimeException: work failed
>>>>> 	at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:109)
>>>>> 	at com.zfabrik.hadoop.impl.DelegatingMapper.run(DelegatingMapper.java:40)
>>>>> 	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:625)
>>>>> 	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
>>>>> 	at org.apache.hadoop.mapred.Child.main(Child.java:170)
>>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>>>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>>>> 	at java.lang.reflect.Method.invoke(Method.java:597)
>>>>> 	at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:107)
>>>>> 	... 4 more
>>>>> Caused by: java.lang.RuntimeException: me.prettyprint.hector.api.exceptions.HUnavailableException: : May not be enough replicas present to handle consistency level.
>>>>> 	at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:47)
>>>>> 	at com.zfabrik.work.WorkUnit.work(WorkUnit.java:342)
>>>>> 	at com.zfabrik.impl.launch.ProcessRunnerImpl.work(ProcessRunnerImpl.java:189)
>>>>> 	... 9 more
>>>>> Caused by: me.prettyprint.hector.api.exceptions.HUnavailableException: : May not be enough replicas present to handle consistency level.
>>>>> 	at me.prettyprint.cassandra.service.ExceptionsTranslatorImpl.translate(ExceptionsTranslatorImpl.java:52)
>>>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:95)
>>>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:88)
>>>>> 	at me.prettyprint.cassandra.service.Operation.executeAndSetResult(Operation.java:101)
>>>>> 	at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:221)
>>>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.operateWithFailover(KeyspaceServiceImpl.java:129)
>>>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:100)
>>>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:106)
>>>>> 	at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:203)
>>>>> 	at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:200)
>>>>> 	at me.prettyprint.cassandra.model.KeyspaceOperationCallback.doInKeyspaceAndMeasure(KeyspaceOperationCallback.java:20)
>>>>> 	at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecute(ExecutingKeyspace.java:85)
>>>>> 	at me.prettyprint.cassandra.model.MutatorImpl.execute(MutatorImpl.java:200)
>>>>> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper._flush(HectorBasedMassItemGenMapper.java:122)
>>>>> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:103)
>>>>> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:1)
>>>>> 	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>>>>> 	at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:45)
>>>>> 	... 11 more
>>>>> Caused by: UnavailableException()
>>>>> 	at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:16485)
>>>>> 	at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:916)
>>>>> 	at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:890)
>>>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:93)
>>>>> 	... 27 more
>>>>> 
>>>>> -------
>>>>> Task attempt_201104291345_0001_m_000028_0 failed to report status for 602 seconds. Killing!
>>>>> 
>>>>> 
>>>>> I also observed also that when connecting to cassandra-cli during the stress test, it was not possible to list the items written so far:
>>>>> 
>>>>> [default@unknown] use ItemRepo;
>>>>> Authenticated to keyspace: ItemRepo
>>>>> [default@ItemRepo] list Items;
>>>>> Using default limit of 100
>>>>> Internal error processing get_range_slices
>>>>> 
>>>>> 
>>>>> It seems to me that from the point I performed the read operation in the cli tool, the node becomes somehow confused.
>>>>> Looking on the jconsole shows that up to this point the heap is well: it grows and gcs clears it again. 
>>>>> But from this point on, gcs doesn't really help anymore (see attached screenshot).
>>>>> 
>>>>> 
>>>>> <Bildschirmfoto 2011-04-29 um 18.30.14.png>
>>>>> 
>>>>> 
>>>>> This has also impact on the other nodes as one can see in the second screenshot. The CPU Usage goes down as well as the heap memory usage.  
>>>>> <Bildschirmfoto 2011-04-29 um 18.36.34.png>
>>>>> 
>>>>> I'll run another stress test at the weekend with
>>>>> 
>>>>> * MAX_HEAP_SIZE="4G"
>>>>> * HEAP_NEWSIZE="400M"
>>>>> 
>>>>> Best Regards
>>>>> Udo
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Experiences with Map&Reduce Stress Tests

Posted by Subscriber <su...@zfabrik.de>.
Hi Jeremy, 

yes, the setup on the data-nodes is:
	- Hadoop DataNode
	- Hadoop TaskTracker
	- CassandraDaemon
 
However - the map-input is not read from Cassandra. I am running a writing stress test - no reads (well from time to time I check the produced items using cassandra-cli).
Is it possible to achieve data-locality on writes? Well I think that this is (in practice) not possible (one could create some artificial data that correlates with the hashed row-key values or so ... ;-)

Thanks for all your tips and hints! It's good see that someone worries about my problems :-)
But - to be honest - my number one priority is not to get this test running but to answer the question whether the setup Cassandra+Hadoop with massive parallel writes (using map/reduce) meets the demands of our customer.

I found out that the following configuration helps a lot. 
 * disk_access_mode: standard 
 * MAX_HEAP_SIZE="4G"
 * HEAP_NEWSIZE="400M"
 * rpc_timeout_in_ms: 20000

Now the stress test runs through, but there are still timeouts (Hadoop reschedules the failing mapper tasks on another node and so the test runs through).
But what causes this timeouts? 20 seconds are a long time for a modern cpu (and an eternity for an android ;-) 

It seems to me that it's not only the massive amount of data or to many parallel mappers, because Cassandra can handle this huge write rate over one hour! 
I found in the system.logs that the ConcurrentMarkSweeps take quite long (up to 8 seconds). The heap size didn't grow much about 3GB so there was still "enough air to breath".

So the question remains: can I recommend this setup?

Thanks again and best regards
Udo


Am 02.05.2011 um 20:21 schrieb Jeremy Hanna:

> Udo,
> 
> One thing to get out of the way - you're running task trackers on all of your cassandra nodes, right?  That is the first and foremost way to get good performance.  Otherwise you don't have data locality, which is really the point of map/reduce, co-locating your data and your processes operating over that data.  You're probably already doing that, but I had forgotten to ask that before.
> 
> Besides that...
> 
> You might try messing with those values a bit more as well as the input split size - cassandra.input.split.size which defaults to ~65k.  So you might try rpc timeout of 30s just to see if that helps and try reducing the input split size significantly to see if that helps.
> 
> For your setup I don't see the range batch size as being meaningful at all with your narrow rows, so don't worry about that.
> 
> Also, the capacity of your nodes and the number of mappers/reducers you're trying to use will also have an effect on whether it has to timeout.  Essentially it's getting overwhelmed for some reason.  You might lower the number of mappers and reducers you're hitting your cassandra cluster with to see if that helps.
> 
> Jeremy
> 
> On May 2, 2011, at 6:25 AM, Subscriber wrote:
> 
>> Hi Jeremy, 
>> 
>> thanks for the link.
>> I doubled the rpc_timeout (20 seconds) and reduced the range-batch-size to 2048, but I still get timeouts...
>> 
>> Udo
>> 
>> Am 29.04.2011 um 18:53 schrieb Jeremy Hanna:
>> 
>>> It sounds like there might be some tuning you can do to your jobs - take a look at the wiki's HadoopSupport page, specifically the Troubleshooting section:
>>> http://wiki.apache.org/cassandra/HadoopSupport#Troubleshooting
>>> 
>>> On Apr 29, 2011, at 11:45 AM, Subscriber wrote:
>>> 
>>>> Hi all, 
>>>> 
>>>> We want to share our experiences we got during our Cassandra plus Hadoop Map/Reduce evaluation.
>>>> Our question was whether Cassandra is suitable for massive distributed data writes using Hadoop's Map/Reduce feature.
>>>> 
>>>> Our setup is described in the attached file 'cassandra_stress_setup.txt'.
>>>> 
>>>> <cassandra_stress_setup.txt>
>>>> 
>>>> The stress test uses 800 map-tasks to generate data and store it into cassandra.
>>>> Each map task writes 500.000 items (i.e. rows) resulting in totally 400.000.000 items. 
>>>> There are max. 8 map tasks in parallel on each node. An item contains (beside the key) two long and two double values, 
>>>> so that items are a few 100 bytes in size. This leads to a total data size of approximately 120GB.
>>>> 
>>>> The Map-Tasks uses the Hector API. Hector is "feeded" with all three data nodes. The data is written in chunks of 1000 items.
>>>> The ConsitencyLevel is set to ONE.
>>>> 
>>>> We ran the stress tests in several runs with different configuration settings (for example I started with cassandra's default configuration and I used Pelops for another test).
>>>> 
>>>> Our observations are like this:
>>>> 
>>>> 1) Cassandra is really fast - we are really impressed about the huge write throughput. A map task writing 500.000 items (appr. 200MB) usually finishes under 5 minutes.
>>>> 2) However - unfortunately all tests failed in the end
>>>> 
>>>> In the beginning there are no problems. The first 100 (in some tests the first 300(!)) map tasks are looking fine. But then the trouble starts.
>>>> 
>>>> Hadoop's sample output after ~15 minutes:
>>>> 
>>>> Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed Task Attempts
>>>> map	14.99%		800		680	24	96		0	0 / 0
>>>> reduce	3.99%		1		0	1	0		0	0 / 0
>>>> 
>>>> Some stats:
>>>>> top
>>>> PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                                                                                                                                              
>>>> 31159 xxxx      20   0 2569m 2.2g 9.8m S  450 18.6  61:44.73 java                                                                                                                                                                                                   
>>>> 
>>>>> vmstat 1 5
>>>> procs -----------memory---------- ---swap-- -----io---- -system-- ----cpu----
>>>> r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa
>>>> 2  1  36832 353688 242820 6837520    0    0    15    73    3    2  3  0 96  0
>>>> 11  1  36832 350992 242856 6852136    0    0  1024 20900 4508 11738 19  1 74  6
>>>> 8  0  36832 339728 242876 6859828    0    0     0  1068 45809 107008 69 10 20  0
>>>> 1  0  36832 330212 242884 6868520    0    0     0    80 42112 92930 71  8 21  0
>>>> 2  0  36832 311888 242908 6887708    0    0  1024     0 20277 46669 46  7 47  0
>>>> 
>>>>> cassandra/bin/nodetool -h tirdata1 -p 28080 ring
>>>> Address         Status State   Load            Owns    Token                                      
>>>>                                                     113427455640312821154458202477256070484     
>>>> 192.168.11.198  Up     Normal  6.72 GB         33.33%  0                                          
>>>> 192.168.11.199  Up     Normal  6.72 GB         33.33%  56713727820156410577229101238628035242      
>>>> 192.168.11.202  Up     Normal  6.68 GB         33.33%  113427455640312821154458202477256070484     
>>>> 
>>>> 
>>>> Hadoop's sample output after ~20 minutes:
>>>> 
>>>> Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed Task Attempts
>>>> map	15.49%		800		673	24	103		0	6 / 0
>>>> reduce	4.16%		1		0	1	0		0	0 / 0
>>>> 
>>>> What went wrong? It's always the same. The clients cannot reach the nodes anymore. 
>>>> 
>>>> java.lang.RuntimeException: work failed
>>>> 	at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:109)
>>>> 	at com.zfabrik.hadoop.impl.DelegatingMapper.run(DelegatingMapper.java:40)
>>>> 	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:625)
>>>> 	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
>>>> 	at org.apache.hadoop.mapred.Child.main(Child.java:170)
>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>>> 	at java.lang.reflect.Method.invoke(Method.java:597)
>>>> 	at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:107)
>>>> 	... 4 more
>>>> Caused by: java.lang.RuntimeException: me.prettyprint.hector.api.exceptions.HUnavailableException: : May not be enough replicas present to handle consistency level.
>>>> 	at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:47)
>>>> 	at com.zfabrik.work.WorkUnit.work(WorkUnit.java:342)
>>>> 	at com.zfabrik.impl.launch.ProcessRunnerImpl.work(ProcessRunnerImpl.java:189)
>>>> 	... 9 more
>>>> Caused by: me.prettyprint.hector.api.exceptions.HUnavailableException: : May not be enough replicas present to handle consistency level.
>>>> 	at me.prettyprint.cassandra.service.ExceptionsTranslatorImpl.translate(ExceptionsTranslatorImpl.java:52)
>>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:95)
>>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:88)
>>>> 	at me.prettyprint.cassandra.service.Operation.executeAndSetResult(Operation.java:101)
>>>> 	at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:221)
>>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.operateWithFailover(KeyspaceServiceImpl.java:129)
>>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:100)
>>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:106)
>>>> 	at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:203)
>>>> 	at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:200)
>>>> 	at me.prettyprint.cassandra.model.KeyspaceOperationCallback.doInKeyspaceAndMeasure(KeyspaceOperationCallback.java:20)
>>>> 	at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecute(ExecutingKeyspace.java:85)
>>>> 	at me.prettyprint.cassandra.model.MutatorImpl.execute(MutatorImpl.java:200)
>>>> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper._flush(HectorBasedMassItemGenMapper.java:122)
>>>> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:103)
>>>> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:1)
>>>> 	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>>>> 	at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:45)
>>>> 	... 11 more
>>>> Caused by: UnavailableException()
>>>> 	at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:16485)
>>>> 	at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:916)
>>>> 	at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:890)
>>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:93)
>>>> 	... 27 more
>>>> 
>>>> -------
>>>> Task attempt_201104291345_0001_m_000028_0 failed to report status for 602 seconds. Killing!
>>>> 
>>>> 
>>>> I also observed also that when connecting to cassandra-cli during the stress test, it was not possible to list the items written so far:
>>>> 
>>>> [default@unknown] use ItemRepo;
>>>> Authenticated to keyspace: ItemRepo
>>>> [default@ItemRepo] list Items;
>>>> Using default limit of 100
>>>> Internal error processing get_range_slices
>>>> 
>>>> 
>>>> It seems to me that from the point I performed the read operation in the cli tool, the node becomes somehow confused.
>>>> Looking on the jconsole shows that up to this point the heap is well: it grows and gcs clears it again. 
>>>> But from this point on, gcs doesn't really help anymore (see attached screenshot).
>>>> 
>>>> 
>>>> <Bildschirmfoto 2011-04-29 um 18.30.14.png>
>>>> 
>>>> 
>>>> This has also impact on the other nodes as one can see in the second screenshot. The CPU Usage goes down as well as the heap memory usage.  
>>>> <Bildschirmfoto 2011-04-29 um 18.36.34.png>
>>>> 
>>>> I'll run another stress test at the weekend with
>>>> 
>>>> * MAX_HEAP_SIZE="4G"
>>>> * HEAP_NEWSIZE="400M"
>>>> 
>>>> Best Regards
>>>> Udo
>>>> 
>>> 
>> 
> 


Re: Experiences with Map&Reduce Stress Tests

Posted by Jeremy Hanna <je...@gmail.com>.
Udo,

One thing to get out of the way - you're running task trackers on all of your cassandra nodes, right?  That is the first and foremost way to get good performance.  Otherwise you don't have data locality, which is really the point of map/reduce, co-locating your data and your processes operating over that data.  You're probably already doing that, but I had forgotten to ask that before.

Besides that...

You might try messing with those values a bit more as well as the input split size - cassandra.input.split.size which defaults to ~65k.  So you might try rpc timeout of 30s just to see if that helps and try reducing the input split size significantly to see if that helps.

For your setup I don't see the range batch size as being meaningful at all with your narrow rows, so don't worry about that.

Also, the capacity of your nodes and the number of mappers/reducers you're trying to use will also have an effect on whether it has to timeout.  Essentially it's getting overwhelmed for some reason.  You might lower the number of mappers and reducers you're hitting your cassandra cluster with to see if that helps.

Jeremy

On May 2, 2011, at 6:25 AM, Subscriber wrote:

> Hi Jeremy, 
> 
> thanks for the link.
> I doubled the rpc_timeout (20 seconds) and reduced the range-batch-size to 2048, but I still get timeouts...
> 
> Udo
> 
> Am 29.04.2011 um 18:53 schrieb Jeremy Hanna:
> 
>> It sounds like there might be some tuning you can do to your jobs - take a look at the wiki's HadoopSupport page, specifically the Troubleshooting section:
>> http://wiki.apache.org/cassandra/HadoopSupport#Troubleshooting
>> 
>> On Apr 29, 2011, at 11:45 AM, Subscriber wrote:
>> 
>>> Hi all, 
>>> 
>>> We want to share our experiences we got during our Cassandra plus Hadoop Map/Reduce evaluation.
>>> Our question was whether Cassandra is suitable for massive distributed data writes using Hadoop's Map/Reduce feature.
>>> 
>>> Our setup is described in the attached file 'cassandra_stress_setup.txt'.
>>> 
>>> <cassandra_stress_setup.txt>
>>> 
>>> The stress test uses 800 map-tasks to generate data and store it into cassandra.
>>> Each map task writes 500.000 items (i.e. rows) resulting in totally 400.000.000 items. 
>>> There are max. 8 map tasks in parallel on each node. An item contains (beside the key) two long and two double values, 
>>> so that items are a few 100 bytes in size. This leads to a total data size of approximately 120GB.
>>> 
>>> The Map-Tasks uses the Hector API. Hector is "feeded" with all three data nodes. The data is written in chunks of 1000 items.
>>> The ConsitencyLevel is set to ONE.
>>> 
>>> We ran the stress tests in several runs with different configuration settings (for example I started with cassandra's default configuration and I used Pelops for another test).
>>> 
>>> Our observations are like this:
>>> 
>>> 1) Cassandra is really fast - we are really impressed about the huge write throughput. A map task writing 500.000 items (appr. 200MB) usually finishes under 5 minutes.
>>> 2) However - unfortunately all tests failed in the end
>>> 
>>> In the beginning there are no problems. The first 100 (in some tests the first 300(!)) map tasks are looking fine. But then the trouble starts.
>>> 
>>> Hadoop's sample output after ~15 minutes:
>>> 
>>> Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed Task Attempts
>>> map	14.99%		800		680	24	96		0	0 / 0
>>> reduce	3.99%		1		0	1	0		0	0 / 0
>>> 
>>> Some stats:
>>>> top
>>> PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                                                                                                                                              
>>> 31159 xxxx      20   0 2569m 2.2g 9.8m S  450 18.6  61:44.73 java                                                                                                                                                                                                   
>>> 
>>>> vmstat 1 5
>>> procs -----------memory---------- ---swap-- -----io---- -system-- ----cpu----
>>> r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa
>>> 2  1  36832 353688 242820 6837520    0    0    15    73    3    2  3  0 96  0
>>> 11  1  36832 350992 242856 6852136    0    0  1024 20900 4508 11738 19  1 74  6
>>> 8  0  36832 339728 242876 6859828    0    0     0  1068 45809 107008 69 10 20  0
>>> 1  0  36832 330212 242884 6868520    0    0     0    80 42112 92930 71  8 21  0
>>> 2  0  36832 311888 242908 6887708    0    0  1024     0 20277 46669 46  7 47  0
>>> 
>>>> cassandra/bin/nodetool -h tirdata1 -p 28080 ring
>>> Address         Status State   Load            Owns    Token                                      
>>>                                                      113427455640312821154458202477256070484     
>>> 192.168.11.198  Up     Normal  6.72 GB         33.33%  0                                          
>>> 192.168.11.199  Up     Normal  6.72 GB         33.33%  56713727820156410577229101238628035242      
>>> 192.168.11.202  Up     Normal  6.68 GB         33.33%  113427455640312821154458202477256070484     
>>> 
>>> 
>>> Hadoop's sample output after ~20 minutes:
>>> 
>>> Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed Task Attempts
>>> map	15.49%		800		673	24	103		0	6 / 0
>>> reduce	4.16%		1		0	1	0		0	0 / 0
>>> 
>>> What went wrong? It's always the same. The clients cannot reach the nodes anymore. 
>>> 
>>> java.lang.RuntimeException: work failed
>>> 	at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:109)
>>> 	at com.zfabrik.hadoop.impl.DelegatingMapper.run(DelegatingMapper.java:40)
>>> 	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:625)
>>> 	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
>>> 	at org.apache.hadoop.mapred.Child.main(Child.java:170)
>>> Caused by: java.lang.reflect.InvocationTargetException
>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>> 	at java.lang.reflect.Method.invoke(Method.java:597)
>>> 	at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:107)
>>> 	... 4 more
>>> Caused by: java.lang.RuntimeException: me.prettyprint.hector.api.exceptions.HUnavailableException: : May not be enough replicas present to handle consistency level.
>>> 	at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:47)
>>> 	at com.zfabrik.work.WorkUnit.work(WorkUnit.java:342)
>>> 	at com.zfabrik.impl.launch.ProcessRunnerImpl.work(ProcessRunnerImpl.java:189)
>>> 	... 9 more
>>> Caused by: me.prettyprint.hector.api.exceptions.HUnavailableException: : May not be enough replicas present to handle consistency level.
>>> 	at me.prettyprint.cassandra.service.ExceptionsTranslatorImpl.translate(ExceptionsTranslatorImpl.java:52)
>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:95)
>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:88)
>>> 	at me.prettyprint.cassandra.service.Operation.executeAndSetResult(Operation.java:101)
>>> 	at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:221)
>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.operateWithFailover(KeyspaceServiceImpl.java:129)
>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:100)
>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:106)
>>> 	at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:203)
>>> 	at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:200)
>>> 	at me.prettyprint.cassandra.model.KeyspaceOperationCallback.doInKeyspaceAndMeasure(KeyspaceOperationCallback.java:20)
>>> 	at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecute(ExecutingKeyspace.java:85)
>>> 	at me.prettyprint.cassandra.model.MutatorImpl.execute(MutatorImpl.java:200)
>>> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper._flush(HectorBasedMassItemGenMapper.java:122)
>>> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:103)
>>> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:1)
>>> 	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>>> 	at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:45)
>>> 	... 11 more
>>> Caused by: UnavailableException()
>>> 	at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:16485)
>>> 	at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:916)
>>> 	at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:890)
>>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:93)
>>> 	... 27 more
>>> 
>>> -------
>>> Task attempt_201104291345_0001_m_000028_0 failed to report status for 602 seconds. Killing!
>>> 
>>> 
>>> I also observed also that when connecting to cassandra-cli during the stress test, it was not possible to list the items written so far:
>>> 
>>> [default@unknown] use ItemRepo;
>>> Authenticated to keyspace: ItemRepo
>>> [default@ItemRepo] list Items;
>>> Using default limit of 100
>>> Internal error processing get_range_slices
>>> 
>>> 
>>> It seems to me that from the point I performed the read operation in the cli tool, the node becomes somehow confused.
>>> Looking on the jconsole shows that up to this point the heap is well: it grows and gcs clears it again. 
>>> But from this point on, gcs doesn't really help anymore (see attached screenshot).
>>> 
>>> 
>>> <Bildschirmfoto 2011-04-29 um 18.30.14.png>
>>> 
>>> 
>>> This has also impact on the other nodes as one can see in the second screenshot. The CPU Usage goes down as well as the heap memory usage.  
>>> <Bildschirmfoto 2011-04-29 um 18.36.34.png>
>>> 
>>> I'll run another stress test at the weekend with
>>> 
>>> * MAX_HEAP_SIZE="4G"
>>> * HEAP_NEWSIZE="400M"
>>> 
>>> Best Regards
>>> Udo
>>> 
>> 
> 


Re: Experiences with Map&Reduce Stress Tests

Posted by Subscriber <su...@zfabrik.de>.
Hi Jeremy, 

thanks for the link.
I doubled the rpc_timeout (20 seconds) and reduced the range-batch-size to 2048, but I still get timeouts...

Udo

Am 29.04.2011 um 18:53 schrieb Jeremy Hanna:

> It sounds like there might be some tuning you can do to your jobs - take a look at the wiki's HadoopSupport page, specifically the Troubleshooting section:
> http://wiki.apache.org/cassandra/HadoopSupport#Troubleshooting
> 
> On Apr 29, 2011, at 11:45 AM, Subscriber wrote:
> 
>> Hi all, 
>> 
>> We want to share our experiences we got during our Cassandra plus Hadoop Map/Reduce evaluation.
>> Our question was whether Cassandra is suitable for massive distributed data writes using Hadoop's Map/Reduce feature.
>> 
>> Our setup is described in the attached file 'cassandra_stress_setup.txt'.
>> 
>> <cassandra_stress_setup.txt>
>> 
>> The stress test uses 800 map-tasks to generate data and store it into cassandra.
>> Each map task writes 500.000 items (i.e. rows) resulting in totally 400.000.000 items. 
>> There are max. 8 map tasks in parallel on each node. An item contains (beside the key) two long and two double values, 
>> so that items are a few 100 bytes in size. This leads to a total data size of approximately 120GB.
>> 
>> The Map-Tasks uses the Hector API. Hector is "feeded" with all three data nodes. The data is written in chunks of 1000 items.
>> The ConsitencyLevel is set to ONE.
>> 
>> We ran the stress tests in several runs with different configuration settings (for example I started with cassandra's default configuration and I used Pelops for another test).
>> 
>> Our observations are like this:
>> 
>> 1) Cassandra is really fast - we are really impressed about the huge write throughput. A map task writing 500.000 items (appr. 200MB) usually finishes under 5 minutes.
>> 2) However - unfortunately all tests failed in the end
>> 
>> In the beginning there are no problems. The first 100 (in some tests the first 300(!)) map tasks are looking fine. But then the trouble starts.
>> 
>> Hadoop's sample output after ~15 minutes:
>> 
>> Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed Task Attempts
>> map	14.99%		800		680	24	96		0	0 / 0
>> reduce	3.99%		1		0	1	0		0	0 / 0
>> 
>> Some stats:
>>> top
>>  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                                                                                                                                              
>> 31159 xxxx      20   0 2569m 2.2g 9.8m S  450 18.6  61:44.73 java                                                                                                                                                                                                   
>> 
>>> vmstat 1 5
>> procs -----------memory---------- ---swap-- -----io---- -system-- ----cpu----
>> r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa
>> 2  1  36832 353688 242820 6837520    0    0    15    73    3    2  3  0 96  0
>> 11  1  36832 350992 242856 6852136    0    0  1024 20900 4508 11738 19  1 74  6
>> 8  0  36832 339728 242876 6859828    0    0     0  1068 45809 107008 69 10 20  0
>> 1  0  36832 330212 242884 6868520    0    0     0    80 42112 92930 71  8 21  0
>> 2  0  36832 311888 242908 6887708    0    0  1024     0 20277 46669 46  7 47  0
>> 
>>> cassandra/bin/nodetool -h tirdata1 -p 28080 ring
>> Address         Status State   Load            Owns    Token                                      
>>                                                       113427455640312821154458202477256070484     
>> 192.168.11.198  Up     Normal  6.72 GB         33.33%  0                                          
>> 192.168.11.199  Up     Normal  6.72 GB         33.33%  56713727820156410577229101238628035242      
>> 192.168.11.202  Up     Normal  6.68 GB         33.33%  113427455640312821154458202477256070484     
>> 
>> 
>> Hadoop's sample output after ~20 minutes:
>> 
>> Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed Task Attempts
>> map	15.49%		800		673	24	103		0	6 / 0
>> reduce	4.16%		1		0	1	0		0	0 / 0
>> 
>> What went wrong? It's always the same. The clients cannot reach the nodes anymore. 
>> 
>> java.lang.RuntimeException: work failed
>> 	at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:109)
>> 	at com.zfabrik.hadoop.impl.DelegatingMapper.run(DelegatingMapper.java:40)
>> 	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:625)
>> 	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
>> 	at org.apache.hadoop.mapred.Child.main(Child.java:170)
>> Caused by: java.lang.reflect.InvocationTargetException
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> 	at java.lang.reflect.Method.invoke(Method.java:597)
>> 	at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:107)
>> 	... 4 more
>> Caused by: java.lang.RuntimeException: me.prettyprint.hector.api.exceptions.HUnavailableException: : May not be enough replicas present to handle consistency level.
>> 	at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:47)
>> 	at com.zfabrik.work.WorkUnit.work(WorkUnit.java:342)
>> 	at com.zfabrik.impl.launch.ProcessRunnerImpl.work(ProcessRunnerImpl.java:189)
>> 	... 9 more
>> Caused by: me.prettyprint.hector.api.exceptions.HUnavailableException: : May not be enough replicas present to handle consistency level.
>> 	at me.prettyprint.cassandra.service.ExceptionsTranslatorImpl.translate(ExceptionsTranslatorImpl.java:52)
>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:95)
>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:88)
>> 	at me.prettyprint.cassandra.service.Operation.executeAndSetResult(Operation.java:101)
>> 	at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:221)
>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.operateWithFailover(KeyspaceServiceImpl.java:129)
>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:100)
>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:106)
>> 	at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:203)
>> 	at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:200)
>> 	at me.prettyprint.cassandra.model.KeyspaceOperationCallback.doInKeyspaceAndMeasure(KeyspaceOperationCallback.java:20)
>> 	at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecute(ExecutingKeyspace.java:85)
>> 	at me.prettyprint.cassandra.model.MutatorImpl.execute(MutatorImpl.java:200)
>> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper._flush(HectorBasedMassItemGenMapper.java:122)
>> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:103)
>> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:1)
>> 	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>> 	at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:45)
>> 	... 11 more
>> Caused by: UnavailableException()
>> 	at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:16485)
>> 	at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:916)
>> 	at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:890)
>> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:93)
>> 	... 27 more
>> 
>> -------
>> Task attempt_201104291345_0001_m_000028_0 failed to report status for 602 seconds. Killing!
>> 
>> 
>> I also observed also that when connecting to cassandra-cli during the stress test, it was not possible to list the items written so far:
>> 
>> [default@unknown] use ItemRepo;
>> Authenticated to keyspace: ItemRepo
>> [default@ItemRepo] list Items;
>> Using default limit of 100
>> Internal error processing get_range_slices
>> 
>> 
>> It seems to me that from the point I performed the read operation in the cli tool, the node becomes somehow confused.
>> Looking on the jconsole shows that up to this point the heap is well: it grows and gcs clears it again. 
>> But from this point on, gcs doesn't really help anymore (see attached screenshot).
>> 
>> 
>> <Bildschirmfoto 2011-04-29 um 18.30.14.png>
>> 
>> 
>> This has also impact on the other nodes as one can see in the second screenshot. The CPU Usage goes down as well as the heap memory usage.  
>> <Bildschirmfoto 2011-04-29 um 18.36.34.png>
>> 
>> I'll run another stress test at the weekend with
>> 
>> * MAX_HEAP_SIZE="4G"
>> * HEAP_NEWSIZE="400M"
>> 
>> Best Regards
>> Udo
>> 
> 


Re: Experiences with Map&Reduce Stress Tests

Posted by Jeremy Hanna <je...@gmail.com>.
It sounds like there might be some tuning you can do to your jobs - take a look at the wiki's HadoopSupport page, specifically the Troubleshooting section:
http://wiki.apache.org/cassandra/HadoopSupport#Troubleshooting

On Apr 29, 2011, at 11:45 AM, Subscriber wrote:

> Hi all, 
> 
> We want to share our experiences we got during our Cassandra plus Hadoop Map/Reduce evaluation.
> Our question was whether Cassandra is suitable for massive distributed data writes using Hadoop's Map/Reduce feature.
> 
> Our setup is described in the attached file 'cassandra_stress_setup.txt'.
> 
> <cassandra_stress_setup.txt>
> 
> The stress test uses 800 map-tasks to generate data and store it into cassandra.
> Each map task writes 500.000 items (i.e. rows) resulting in totally 400.000.000 items. 
> There are max. 8 map tasks in parallel on each node. An item contains (beside the key) two long and two double values, 
> so that items are a few 100 bytes in size. This leads to a total data size of approximately 120GB.
> 
> The Map-Tasks uses the Hector API. Hector is "feeded" with all three data nodes. The data is written in chunks of 1000 items.
> The ConsitencyLevel is set to ONE.
> 
> We ran the stress tests in several runs with different configuration settings (for example I started with cassandra's default configuration and I used Pelops for another test).
> 
> Our observations are like this:
> 
> 1) Cassandra is really fast - we are really impressed about the huge write throughput. A map task writing 500.000 items (appr. 200MB) usually finishes under 5 minutes.
> 2) However - unfortunately all tests failed in the end
> 
> In the beginning there are no problems. The first 100 (in some tests the first 300(!)) map tasks are looking fine. But then the trouble starts.
> 
> Hadoop's sample output after ~15 minutes:
> 
> Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed Task Attempts
> map	14.99%		800		680	24	96		0	0 / 0
> reduce	3.99%		1		0	1	0		0	0 / 0
> 
> Some stats:
> >top
>   PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                                                                                                                                              
> 31159 xxxx      20   0 2569m 2.2g 9.8m S  450 18.6  61:44.73 java                                                                                                                                                                                                   
> 
> >vmstat 1 5
> procs -----------memory---------- ---swap-- -----io---- -system-- ----cpu----
>  r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa
>  2  1  36832 353688 242820 6837520    0    0    15    73    3    2  3  0 96  0
> 11  1  36832 350992 242856 6852136    0    0  1024 20900 4508 11738 19  1 74  6
>  8  0  36832 339728 242876 6859828    0    0     0  1068 45809 107008 69 10 20  0
>  1  0  36832 330212 242884 6868520    0    0     0    80 42112 92930 71  8 21  0
>  2  0  36832 311888 242908 6887708    0    0  1024     0 20277 46669 46  7 47  0
> 
> >cassandra/bin/nodetool -h tirdata1 -p 28080 ring
> Address         Status State   Load            Owns    Token                                      
>                                                        113427455640312821154458202477256070484     
> 192.168.11.198  Up     Normal  6.72 GB         33.33%  0                                          
> 192.168.11.199  Up     Normal  6.72 GB         33.33%  56713727820156410577229101238628035242      
> 192.168.11.202  Up     Normal  6.68 GB         33.33%  113427455640312821154458202477256070484     
> 
> 
> Hadoop's sample output after ~20 minutes:
> 
> Kind	% Complete	Num Tasks	Pending	Running	Complete	Killed	Failed/Killed Task Attempts
> map	15.49%		800		673	24	103		0	6 / 0
> reduce	4.16%		1		0	1	0		0	0 / 0
> 
> What went wrong? It's always the same. The clients cannot reach the nodes anymore. 
> 
> java.lang.RuntimeException: work failed
> 	at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:109)
> 	at com.zfabrik.hadoop.impl.DelegatingMapper.run(DelegatingMapper.java:40)
> 	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:625)
> 	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
> 	at org.apache.hadoop.mapred.Child.main(Child.java:170)
> Caused by: java.lang.reflect.InvocationTargetException
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> 	at java.lang.reflect.Method.invoke(Method.java:597)
> 	at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:107)
> 	... 4 more
> Caused by: java.lang.RuntimeException: me.prettyprint.hector.api.exceptions.HUnavailableException: : May not be enough replicas present to handle consistency level.
> 	at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:47)
> 	at com.zfabrik.work.WorkUnit.work(WorkUnit.java:342)
> 	at com.zfabrik.impl.launch.ProcessRunnerImpl.work(ProcessRunnerImpl.java:189)
> 	... 9 more
> Caused by: me.prettyprint.hector.api.exceptions.HUnavailableException: : May not be enough replicas present to handle consistency level.
> 	at me.prettyprint.cassandra.service.ExceptionsTranslatorImpl.translate(ExceptionsTranslatorImpl.java:52)
> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:95)
> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:88)
> 	at me.prettyprint.cassandra.service.Operation.executeAndSetResult(Operation.java:101)
> 	at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:221)
> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.operateWithFailover(KeyspaceServiceImpl.java:129)
> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:100)
> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:106)
> 	at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:203)
> 	at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:200)
> 	at me.prettyprint.cassandra.model.KeyspaceOperationCallback.doInKeyspaceAndMeasure(KeyspaceOperationCallback.java:20)
> 	at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecute(ExecutingKeyspace.java:85)
> 	at me.prettyprint.cassandra.model.MutatorImpl.execute(MutatorImpl.java:200)
> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper._flush(HectorBasedMassItemGenMapper.java:122)
> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:103)
> 	at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:1)
> 	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
> 	at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:45)
> 	... 11 more
> Caused by: UnavailableException()
> 	at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:16485)
> 	at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:916)
> 	at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:890)
> 	at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:93)
> 	... 27 more
> 
> -------
> Task attempt_201104291345_0001_m_000028_0 failed to report status for 602 seconds. Killing!
> 
> 
> I also observed also that when connecting to cassandra-cli during the stress test, it was not possible to list the items written so far:
> 
> [default@unknown] use ItemRepo;
> Authenticated to keyspace: ItemRepo
> [default@ItemRepo] list Items;
> Using default limit of 100
> Internal error processing get_range_slices
> 
> 
> It seems to me that from the point I performed the read operation in the cli tool, the node becomes somehow confused.
> Looking on the jconsole shows that up to this point the heap is well: it grows and gcs clears it again. 
> But from this point on, gcs doesn't really help anymore (see attached screenshot).
> 
> 
> <Bildschirmfoto 2011-04-29 um 18.30.14.png>
> 
> 
> This has also impact on the other nodes as one can see in the second screenshot. The CPU Usage goes down as well as the heap memory usage.  
> <Bildschirmfoto 2011-04-29 um 18.36.34.png>
> 
> I'll run another stress test at the weekend with
> 
>  * MAX_HEAP_SIZE="4G"
>  * HEAP_NEWSIZE="400M"
> 
> Best Regards
> Udo
>