You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Takayuki Tsunakawa <ts...@jp.fujitsu.com> on 2010/10/22 04:45:53 UTC

[Q] MapReduce behavior and Cassandra's scalability for petabytes of data

Hello,

I'm evaluating whether Cassandra fits a certain customer well. The
customer will collect petabytes of logs and analyze them. Could you
tell me if my understanding is correct and/or give me your opinions?
I'm sorry that the analysis requirement is not clear yet.

1. MapReduce behavior
I read the source code of Cassandra 0.6.x and understood that
jobtracker submits the map tasks to all Cassandra nodes, regardless of
whether the target keyspace's data reside there. That is, if there are
1,000 nodes in the Cassandra cluster, jobtracker sends more than 1,000
map tasks to all of the 1,000 nodes in parallel. If this is correct,
I'm afraid the startup time of a MapReduce job gets longer as more
nodes join the Cassandra cluster.
Is this correct?
With HBase, jobtracker submits map tasks only to the region servers
that hold the target data. This behavior is desirable because no
wasteful task submission is done. Can you suggest the cases where
Cassandra+MapReduce is better than HBase+MapReduce for log/sensor
analysis? (Please excuse me for my not presenting the analysis
requirement).

2. Data capacity
The excerpt from the paper about Amazon Dynamo says that the cluster
can scale to hundreds of nodes, not thousands. I understand Cassandra
is similar. Assuming that the recent commodity servers have 2 to 4 TB
of disks, we need about 1,000 nodes or more to store petabytes of
data.
Is the present Cassandra suitable for petabytes of data? If not, is
any development in progress to increase the scalability?


--------------------------------------------------
Finally, Dynamo adopts a full membership model where each node is
aware of the data hosted by its peers. To do this, each node actively
gossips the full routing table with other nodes in the system. This
model works well for a system that contains couple of hundreds of
nodes. However, scaling such a design to run with tens of thousands of
nodes is not trivial because the overhead in maintaining the routing
table increases with the system size. This limitation might be
overcome by introducing hierarchical extensions to Dynamo. Also, note
that this problem is actively addressed by O(1) DHT systems(e.g.,
[14]).
--------------------------------------------------

Regards,
Takayuki Tsunakawa



Re: [Q] MapReduce behavior and Cassandra's scalability for petabytes of data

Posted by Takayuki Tsunakawa <ts...@jp.fujitsu.com>.
Hello, Jonathan,

From: "Jonathan Ellis" <jb...@gmail.com>
> There is no reason Cassandra cannot scale to 1000s or more nodes
with
> the current architecture.

Oh, really, I got an impression that the gossip exchanges limit the
number of nodes in a cluster when I read the Dynamos's paper and
"Cassandra - A Decentralized Structured Storage System" written by
Avinash Lakshman. As i quoted in my first mail, Amazon says that
Dynamo is designed to scale to a couple hundreds of nodes, not
thousands. In addition, previously mentioned paper on Cassandra writes
as follows (though this does not directly say that Cassandra does not
scale to more than a thousand nodes):

"Cassandra aims to run on top of an infrastructure of hundreds of
nodes (possibly spread across different data centers)."

Thank you so much for taking your precious time for me. I would
appreciate if you cloud give me your thoughts if you remember some
technical challenges that could cause difficulties in a cluster which
has petabytes of data and thousands of nodes.

Regards,
Takayuki Tsunakawa




Re: [Q] MapReduce behavior and Cassandra's scalability for petabytes of data

Posted by Edward Capriolo <ed...@gmail.com>.
On Mon, Oct 25, 2010 at 10:19 PM, Takayuki Tsunakawa
<ts...@jp.fujitsu.com> wrote:
> Hello, Mike,
>
> Thank you for your advice. I'll close this thread with this mail (I've been
> afraid I was interrupting the community developers with cloudy questions.)
> I'm happy to know that any clearly known limitation does not exist to limit
> the cluster to a couple hundreds of nodes. If our project starts with
> Cassandra and encounter any issues or interesting things, I'll report here.
>
> Regards,
> Takayuki Tsunakawa
>
> From: Mike Malone
> Hey Takayuki,
>
> I don't think you're going to find anyone willing to promise that Cassandra
> will fit your petabyte scale data analysis problem. That's a lot of data,
> and there's not a ton of operational experience at that scale within the
> community. And the people who do work on that sort of problem tend to be
> busy ;). If your problem is that big, you're probably going to need to do
> some experimentation and see if the system will scale for you. I'm sure
> someone here can answer any specific questions that may come up if you do
> that sort of work.
>
> As you mentioned, the first concern I'd have with a cluster that big is
> whether gossip will scale. I'd suggest taking a look at the gossip code.
> Cassandra nodes are "omniscient" in the sense that they all try to maintain
> full ring state for the entire cluster. At a certain cluster size that no
> longer works.
>
> My best guess is that a cluster of 1000 machines would be fine. Maybe even
> an order of maginitude bigger than that. I could be completely wrong, but
> given the low overhead that I've observed that estimate seems reasonable. If
> you do find that gossip won't work in your situation it would be interesting
> to hear why. You may even consider modifying / updating gossip to work for
> you. The code isn't as scary as it may seem. At that scale it's likely
> you'll encounter bugs and corner cases that other people haven't, so it's
> probably worth familiarizing yourself with the code anyways if you decide to
> use Cassandra.
>
> Mike
>

I miscommunicated my idea. I was not describing the time to compute
splits. I was describing how it takes me 5 minutes to start a
cassandra node with 300 GB of Data and large indexes caused by small
rows.

As for statistics on join times, I do not have them. The intensive
operations like compactions and joins get absorbed by large clusters.
By this I mean that if you have 100 nodes adding the 101st node has a
small impact on the cluster at large.

Re: [Q] MapReduce behavior and Cassandra's scalability for petabytes of data

Posted by Takayuki Tsunakawa <ts...@jp.fujitsu.com>.
Hello, Mike,

Thank you for your advice. I'll close this thread with this mail (I've been
afraid I was interrupting the community developers with cloudy questions.)
I'm happy to know that any clearly known limitation does not exist to limit
the cluster to a couple hundreds of nodes. If our project starts with
Cassandra and encounter any issues or interesting things, I'll report here.

Regards,
Takayuki Tsunakawa


From: Mike Malone
Hey Takayuki,


I don't think you're going to find anyone willing to promise that Cassandra
will fit your petabyte scale data analysis problem. That's a lot of data,
and there's not a ton of operational experience at that scale within the
community. And the people who do work on that sort of problem tend to be
busy ;). If your problem is that big, you're probably going to need to do
some experimentation and see if the system will scale for you. I'm sure
someone here can answer any specific questions that may come up if you do
that sort of work.


As you mentioned, the first concern I'd have with a cluster that big is
whether gossip will scale. I'd suggest taking a look at the gossip code.
Cassandra nodes are "omniscient" in the sense that they all try to maintain
full ring state for the entire cluster. At a certain cluster size that no
longer works.


My best guess is that a cluster of 1000 machines would be fine. Maybe even
an order of maginitude bigger than that. I could be completely wrong, but
given the low overhead that I've observed that estimate seems reasonable. If
you do find that gossip won't work in your situation it would be interesting
to hear why. You may even consider modifying / updating gossip to work for
you. The code isn't as scary as it may seem. At that scale it's likely
you'll encounter bugs and corner cases that other people haven't, so it's
probably worth familiarizing yourself with the code anyways if you decide to
use Cassandra.


Mike

Re: [Q] MapReduce behavior and Cassandra's scalability for petabytes of data

Posted by Mike Malone <mi...@simplegeo.com>.
Hey Takayuki,

I don't think you're going to find anyone willing to promise that Cassandra
will fit your petabyte scale data analysis problem. That's a lot of data,
and there's not a ton of operational experience at that scale within the
community. And the people who do work on that sort of problem tend to be
busy ;). If your problem is that big, you're probably going to need to do
some experimentation and see if the system will scale for you. I'm sure
someone here can answer any specific questions that may come up if you do
that sort of work.

As you mentioned, the first concern I'd have with a cluster that big is
whether gossip will scale. I'd suggest taking a look at the gossip code.
Cassandra nodes are "omniscient" in the sense that they all try to maintain
full ring state for the entire cluster. At a certain cluster size that no
longer works.

My best guess is that a cluster of 1000 machines would be fine. Maybe even
an order of maginitude bigger than that. I could be completely wrong, but
given the low overhead that I've observed that estimate seems reasonable. If
you do find that gossip won't work in your situation it would be interesting
to hear why. You may even consider modifying / updating gossip to work for
you. The code isn't as scary as it may seem. At that scale it's likely
you'll encounter bugs and corner cases that other people haven't, so it's
probably worth familiarizing yourself with the code anyways if you decide to
use Cassandra.

Mike

On Tue, Oct 26, 2010 at 1:09 AM, Takayuki Tsunakawa <
tsunakawa.takay@jp.fujitsu.com> wrote:

> Hello, Edward,
>
> Thank you for giving me insight about large disk nodes.
>
> From: "Edward Capriolo" <ed...@gmail.com>
> > Index sampling on start up. If you have very small rows your indexes
> > become large. These have to be sampled on start up and sampling our
> > indexes for 300Gb of data can take 5 minutes. This is going to be
> > optimized soon.
>
> 5 minutes for 300 GB data ... it's not cheap, is it? Simply, 3 TB of
> data will leat to 50 minutes just for computing input splits. This is
> too expensive when I want only part of the 3 TB data.
>
>
> > (Just wanted to note some of this as I am in the middle of a process
> > of joining a node now :)
>
> Good luck. I'd appreciate if you could some performance numbers of
> joining nodes (amount of data, time to distribute data, load impact on
> applications, etc) if you can. The cluster our customer is thinking of
> is likely to become very large, so I'm interested in the elasticity.
> Yahoo!'s YCSB report makes me worry about adding nodes.
>
> Regards,
> Takayuki Tsunakawa
>
>
> From: "Edward Capriolo" <ed...@gmail.com>
> [Q3]
> There are some challenges with very large disk nodes.
> Caveats:
> I will use words like "long", "slow", and "large" relatively. If you
> have great equipment IE. 10G Ethernet between nodes it will not take
> "long" to transfer data. If you have an insane disk pack it may not
> take "long" to compact 200GB of data. I am basing these statements on
> server class hardware. ~32 GB ram ~2x processor, ~6 disk SAS RAID.
>
> Index sampling on start up. If you have very small rows your indexes
> become large. These have to be sampled on start up and sampling our
> indexes for 300Gb of data can take 5 minutes. This is going to be
> optimized soon.
>
> Joining nodes: When you go with larger systems joining a new node
> involves a lot of transfer, and can take a "long" time.  Node join
> process is going to be optimized in 0.7 and 0.8 (quite drastic changes
> in 0.7)
>
> Major compaction and very large normal compaction can take a "long"
> time. For example while doing a 200 GB compaction that takes 30
> minutes, other sstables build up, more sstables mean "slower" reads.
>
> Achieving a high RAM/DISK ratio may be easier with smaller nodes vs
> one big node with 128 GB RAM $$$.
>
> As Jonathan pointed out nothing technically is stopping larger disk
> nodes.
>
> (Just wanted to note some of this as I am in the middle of a process
> of joining a node now :)
>
>
>

Re: [Q] MapReduce behavior and Cassandra's scalability for petabytes of data

Posted by Takayuki Tsunakawa <ts...@jp.fujitsu.com>.
Hello, Edward,

Thank you for giving me insight about large disk nodes.

From: "Edward Capriolo" <ed...@gmail.com>
> Index sampling on start up. If you have very small rows your indexes
> become large. These have to be sampled on start up and sampling our
> indexes for 300Gb of data can take 5 minutes. This is going to be
> optimized soon.

5 minutes for 300 GB data ... it's not cheap, is it? Simply, 3 TB of
data will leat to 50 minutes just for computing input splits. This is
too expensive when I want only part of the 3 TB data.


> (Just wanted to note some of this as I am in the middle of a process
> of joining a node now :)

Good luck. I'd appreciate if you could some performance numbers of
joining nodes (amount of data, time to distribute data, load impact on
applications, etc) if you can. The cluster our customer is thinking of
is likely to become very large, so I'm interested in the elasticity.
Yahoo!'s YCSB report makes me worry about adding nodes.

Regards,
Takayuki Tsunakawa


From: "Edward Capriolo" <ed...@gmail.com>
[Q3]
There are some challenges with very large disk nodes.
Caveats:
I will use words like "long", "slow", and "large" relatively. If you
have great equipment IE. 10G Ethernet between nodes it will not take
"long" to transfer data. If you have an insane disk pack it may not
take "long" to compact 200GB of data. I am basing these statements on
server class hardware. ~32 GB ram ~2x processor, ~6 disk SAS RAID.

Index sampling on start up. If you have very small rows your indexes
become large. These have to be sampled on start up and sampling our
indexes for 300Gb of data can take 5 minutes. This is going to be
optimized soon.

Joining nodes: When you go with larger systems joining a new node
involves a lot of transfer, and can take a "long" time.  Node join
process is going to be optimized in 0.7 and 0.8 (quite drastic changes
in 0.7)

Major compaction and very large normal compaction can take a "long"
time. For example while doing a 200 GB compaction that takes 30
minutes, other sstables build up, more sstables mean "slower" reads.

Achieving a high RAM/DISK ratio may be easier with smaller nodes vs
one big node with 128 GB RAM $$$.

As Jonathan pointed out nothing technically is stopping larger disk
nodes.

(Just wanted to note some of this as I am in the middle of a process
of joining a node now :)



Re: [Q] MapReduce behavior and Cassandra's scalability for petabytes of data

Posted by Edward Capriolo <ed...@gmail.com>.
On Mon, Oct 25, 2010 at 12:37 PM, Jonathan Ellis <jb...@gmail.com> wrote:
> On Sun, Oct 24, 2010 at 9:09 PM, Takayuki Tsunakawa
> <ts...@jp.fujitsu.com> wrote:
>> From: "Jonathan Ellis" <jb...@gmail.com>
>>> (b) Cassandra generates input splits from the sampling of keys each
>>> node has in memory.  So if a node does end up with no data for a
>>> keyspace (because of bad OOP balancing for instance) it will have no
>>> splits generated or mapped.
>>
>> I understood you are referring to StorageService.getSplits(). This
>> seems to filter out the Cassandra nodes which have no data for the
>> target (keyspace, column family) pair.
>
> Right.
>
>> [Q1]
>> I understood that ColumnFamilyInputFormat requests the above node (or
>> split) filtering to all nodes in the cluster. Is this correct?
>
> Yes.
>
>> [Q2]
>> If Q1 is yes, more nodes result in higher cost of MapReduce job
>> startup (for executing InputFormat.getSplits()).
>
> Not really:
>  1) Each node has a sample of its keys in memory all the time for the
> SSTable row indexes.  So getSplits() is a purely in-JVM-memory
> operation.
>  2) Each node computes its own splits.  Adding more nodes does not change this.
>
>> [Q3-1]
>> How much data is aimed at by the 400 node cluster Riptano is planning?
>> If each node has 4 TB of disks and the replication factor is 3, the
>> simple calculation shows 4 TB * 400 / 3 = 533 TB (ignoring commit log,
>> OS areas, etc).
>
> We do not yet have permission to talk about details of this cluster, sorry.
>
>> [Q3-2]
>> Based on the current architecture, how many nodes is the limit and how
>> much (approximate) data is the practical limit?
>
> There is no reason Cassandra cannot scale to 1000s or more nodes with
> the current architecture.
>
> --
> Jonathan Ellis
> Project Chair, Apache Cassandra
> co-founder of Riptano, the source for professional Cassandra support
> http://riptano.com
>

[Q3]
There are some challenges with very large disk nodes.
Caveats:
I will use words like "long", "slow", and "large" relatively. If you
have great equipment IE. 10G Ethernet between nodes it will not take
"long" to transfer data. If you have an insane disk pack it may not
take "long" to compact 200GB of data. I am basing these statements on
server class hardware. ~32 GB ram ~2x processor, ~6 disk SAS RAID.

Index sampling on start up. If you have very small rows your indexes
become large. These have to be sampled on start up and sampling our
indexes for 300Gb of data can take 5 minutes. This is going to be
optimized soon.

Joining nodes: When you go with larger systems joining a new node
involves a lot of transfer, and can take a "long" time.  Node join
process is going to be optimized in 0.7 and 0.8 (quite drastic changes
in 0.7)

Major compaction and very large normal compaction can take a "long"
time. For example while doing a 200 GB compaction that takes 30
minutes, other sstables build up, more sstables mean "slower" reads.

Achieving a high RAM/DISK ratio may be easier with smaller nodes vs
one big node with 128 GB RAM $$$.

As Jonathan pointed out nothing technically is stopping larger disk nodes.

(Just wanted to note some of this as I am in the middle of a process
of joining a node now :)

Re: [Q] MapReduce behavior and Cassandra's scalability for petabytes of data

Posted by Jonathan Ellis <jb...@gmail.com>.
On Sun, Oct 24, 2010 at 9:09 PM, Takayuki Tsunakawa
<ts...@jp.fujitsu.com> wrote:
> From: "Jonathan Ellis" <jb...@gmail.com>
>> (b) Cassandra generates input splits from the sampling of keys each
>> node has in memory.  So if a node does end up with no data for a
>> keyspace (because of bad OOP balancing for instance) it will have no
>> splits generated or mapped.
>
> I understood you are referring to StorageService.getSplits(). This
> seems to filter out the Cassandra nodes which have no data for the
> target (keyspace, column family) pair.

Right.

> [Q1]
> I understood that ColumnFamilyInputFormat requests the above node (or
> split) filtering to all nodes in the cluster. Is this correct?

Yes.

> [Q2]
> If Q1 is yes, more nodes result in higher cost of MapReduce job
> startup (for executing InputFormat.getSplits()).

Not really:
 1) Each node has a sample of its keys in memory all the time for the
SSTable row indexes.  So getSplits() is a purely in-JVM-memory
operation.
 2) Each node computes its own splits.  Adding more nodes does not change this.

> [Q3-1]
> How much data is aimed at by the 400 node cluster Riptano is planning?
> If each node has 4 TB of disks and the replication factor is 3, the
> simple calculation shows 4 TB * 400 / 3 = 533 TB (ignoring commit log,
> OS areas, etc).

We do not yet have permission to talk about details of this cluster, sorry.

> [Q3-2]
> Based on the current architecture, how many nodes is the limit and how
> much (approximate) data is the practical limit?

There is no reason Cassandra cannot scale to 1000s or more nodes with
the current architecture.

-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of Riptano, the source for professional Cassandra support
http://riptano.com

Re: [Q] MapReduce behavior and Cassandra's scalability for petabytes of data

Posted by Takayuki Tsunakawa <ts...@jp.fujitsu.com>.
Hello, Jonathan,

Thank you for your kind reply. Could you give me some more
opinions/comments?


From: "Jonathan Ellis" <jb...@gmail.com>
> (b) Cassandra generates input splits from the sampling of keys each
> node has in memory.  So if a node does end up with no data for a
> keyspace (because of bad OOP balancing for instance) it will have no
> splits generated or mapped.

I understood you are referring to StorageService.getSplits(). This
seems to filter out the Cassandra nodes which have no data for the
target (keyspace, column family) pair.

[Q1]
I understood that ColumnFamilyInputFormat requests the above node (or
split) filtering to all nodes in the cluster. Is this correct?

[Q2]
If Q1 is yes, more nodes result in higher cost of MapReduce job
startup (for executing InputFormat.getSplits()). Do you have any
performance numbers about this startup cost (time)? I'd like to know
how high it is when the cluster consists of hundreds of nodes.


[Q3]
Going back to my first mail, I'm wondering if the present Cassandra is
applicable to the analysis of petabytes of data.
[Q3-1]
How much data is aimed at by the 400 node cluster Riptano is planning?
If each node has 4 TB of disks and the replication factor is 3, the
simple calculation shows 4 TB * 400 / 3 = 533 TB (ignoring commit log,
OS areas, etc).
[Q3-2]
Based on the current architecture, how many nodes is the limit and how
much (approximate) data is the practical limit?


Regards,
Takayuki Tsunakawa







Re: [Q] MapReduce behavior and Cassandra's scalability for petabytes of data

Posted by Jonathan Ellis <jb...@gmail.com>.
On Fri, Oct 22, 2010 at 3:30 AM, Takayuki Tsunakawa
<ts...@jp.fujitsu.com> wrote:
> Yes, I meant one map task would be sent to each task tracker, resulting in
> 1,000 concurrent map tasks in the cluster. ColumnFamilyInputFormat cannot
> identify the nodes that actually hold some data, so the job tracker will
> send the map tasks to all of the 1,000 nodes. This is wasteful and
> time-consuming if only 200 nodes hold some data for a keyspace.

(a) Normally all data from each keyspace is spread around each node in
the cluster.  This is what you want for best parallelism.

(b) Cassandra generates input splits from the sampling of keys each
node has in memory.  So if a node does end up with no data for a
keyspace (because of bad OOP balancing for instance) it will have no
splits generated or mapped.

-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of Riptano, the source for professional Cassandra support
http://riptano.com

Re: [Q] MapReduce behavior and Cassandra's scalability for petabytes of data

Posted by Aaron Morton <aa...@thelastpickle.com>.
I may be wrong about which nodes the task is sent to.  

Others here know more about hadoop integration.

Aaron
  

On 22 Oct 2010, at 21:30, Takayuki Tsunakawa <ts...@jp.fujitsu.com> wrote:

> Hello, Aaron,
>  
> Thank you for much info (especially pointers that seem interesting).
>  
> > So you would not have 1,000 tasks sent to each of the 1,000 cassandra nodes.
>  
> Yes, I meant one map task would be sent to each task tracker, resulting in 1,000 concurrent map tasks in the cluster. ColumnFamilyInputFormat cannot identify the nodes that actually hold some data, so the job tracker will send the map tasks to all of the 1,000 nodes. This is wasteful and time-consuming if only 200 nodes hold some data for a keyspace.
>  
> > When the task runs on the cassandra node it will iterate through all of the rows in the specified ColumnFamily with keys in the Token range the Node is responsible for.
>  
> I hope the ColumnFamilyInputFormat will allow us to set KeyRange to select rows passed to map.
>  
> I'll read the web pages you gave me. Thank you.
> All, any other advice and comment is appreciated.
>  
> Regards,
> Takayuki Tsunakawa
>  
> ----- Original Message ----- 
> From: aaron morton 
> To: user@cassandra.apache.org 
> Sent: Friday, October 22, 2010 4:05 PM
> Subject: Re: [Q] MapReduce behavior and Cassandra's scalability for petabytes of data
>  
> 
> For plain old log analysis the Cloudera Hadoop distribution may be a better match. Flume is designed to help with streaming data into HDFS, the LZo compression extensions would help with the data size and PIG would make the analysis easier (IMHO). 
> http://www.cloudera.com/hadoop/
> http://www.cloudera.com/blog/2010/09/using-flume-to-collect-apache-2-web-server-logs/
> http://www.cloudera.com/blog/2009/11/hadoop-at-twitter-part-1-splittable-lzo-compression/
>  
> 
> I'll try to answer your questions, others please jump in if I'm wrong.
>  
> 
> 1. Data in a keyspace will be distributed to all nodes in the cassandra cluster. AFAIK the Job Tracker should only send one task to each task tracker, and normally you would have a task tracker running on each cassandra node. The task tracker can then throttle how may concurrent tasks can run. So you would not have 1,000 tasks sent to each of the 1,000 cassandra nodes.
>  
> 
> When the task runs on the cassandra node it will iterate through all of the rows in the specified ColumnFamily with keys in the Token range the Node is responsible for. If cassandra is using the RandomPartitioner, data will be spear around the cluster. So, for example, a Map-Reduce job that only wants to read the last weeks data may have to read from every node. Obviously this depends on how the data is broken up between rows / columns.
>  
>  
>  
> 
> 2. Some of the other people from riptano.com or rackspace may be able to help with Cassandra's outer limits. There is a 400 node cluster planned http://www.riptano.com/blog/riptano-and-digital-reasoning-form-partnership
>  
> 
> Hope that helps. 
> Aaron

Re: [Q] MapReduce behavior and Cassandra's scalability for petabytes of data

Posted by Takayuki Tsunakawa <ts...@jp.fujitsu.com>.
Hello, Aaron,

Thank you for much info (especially pointers that seem interesting).

> So you would not have 1,000 tasks sent to each of the 1,000 cassandra
nodes.

Yes, I meant one map task would be sent to each task tracker, resulting in
1,000 concurrent map tasks in the cluster. ColumnFamilyInputFormat cannot
identify the nodes that actually hold some data, so the job tracker will
send the map tasks to all of the 1,000 nodes. This is wasteful and
time-consuming if only 200 nodes hold some data for a keyspace.

> When the task runs on the cassandra node it will iterate through all of
the rows in the specified ColumnFamily with keys in the Token range the Node
is responsible for.

I hope the ColumnFamilyInputFormat will allow us to set KeyRange to select
rows passed to map.

I'll read the web pages you gave me. Thank you.
All, any other advice and comment is appreciated.

Regards,
Takayuki Tsunakawa

----- Original Message ----- 
From: aaron morton
To: user@cassandra.apache.org
Sent: Friday, October 22, 2010 4:05 PM
Subject: Re: [Q] MapReduce behavior and Cassandra's scalability for
petabytes of data


For plain old log analysis the Cloudera Hadoop distribution may be a better
match. Flume is designed to help with streaming data into HDFS, the LZo
compression extensions would help with the data size and PIG would make the
analysis easier (IMHO).
http://www.cloudera.com/hadoop/
http://www.cloudera.com/blog/2010/09/using-flume-to-collect-apache-2-web-server-logs/
http://www.cloudera.com/blog/2009/11/hadoop-at-twitter-part-1-splittable-lzo-compression/


I'll try to answer your questions, others please jump in if I'm wrong.


1. Data in a keyspace will be distributed to all nodes in the cassandra
cluster. AFAIK the Job Tracker should only send one task to each task
tracker, and normally you would have a task tracker running on each
cassandra node. The task tracker can then throttle how may concurrent tasks
can run. So you would not have 1,000 tasks sent to each of the 1,000
cassandra nodes.


When the task runs on the cassandra node it will iterate through all of the
rows in the specified ColumnFamily with keys in the Token range the Node is
responsible for. If cassandra is using the RandomPartitioner, data will be
spear around the cluster. So, for example, a Map-Reduce job that only wants
to read the last weeks data may have to read from every node. Obviously this
depends on how the data is broken up between rows / columns.




2. Some of the other people from riptano.com or rackspace may be able to
help with Cassandra's outer limits. There is a 400 node cluster planned
http://www.riptano.com/blog/riptano-and-digital-reasoning-form-partnership


Hope that helps.
Aaron

Re: [Q] MapReduce behavior and Cassandra's scalability for petabytes of data

Posted by aaron morton <aa...@thelastpickle.com>.
For plain old log analysis the Cloudera Hadoop distribution may be a better match. Flume is designed to help with streaming data into HDFS, the LZo compression extensions would help with the data size and PIG would make the analysis easier (IMHO). 
http://www.cloudera.com/hadoop/
http://www.cloudera.com/blog/2010/09/using-flume-to-collect-apache-2-web-server-logs/
http://www.cloudera.com/blog/2009/11/hadoop-at-twitter-part-1-splittable-lzo-compression/

I'll try to answer your questions, others please jump in if I'm wrong.

1. Data in a keyspace will be distributed to all nodes in the cassandra cluster. AFAIK the Job Tracker should only send one task to each task tracker, and normally you would have a task tracker running on each cassandra node. The task tracker can then throttle how may concurrent tasks can run. So you would not have 1,000 tasks sent to each of the 1,000 cassandra nodes. 

When the task runs on the cassandra node it will iterate through all of the rows in the specified ColumnFamily with keys in the Token range the Node is responsible for. If cassandra is using the RandomPartitioner, data will be spear around the cluster. So, for example, a Map-Reduce job that only wants to read the last weeks data may have to read from every node. Obviously this depends on how the data is broken up between rows / columns. 


2. Some of the other people from riptano.com or rackspace may be able to help with Cassandra's outer limits. There is a 400 node cluster planned http://www.riptano.com/blog/riptano-and-digital-reasoning-form-partnership

Hope that helps. 
Aaron

On 22 Oct 2010, at 15:45, Takayuki Tsunakawa wrote:

> Hello,
> 
> I'm evaluating whether Cassandra fits a certain customer well. The
> customer will collect petabytes of logs and analyze them. Could you
> tell me if my understanding is correct and/or give me your opinions?
> I'm sorry that the analysis requirement is not clear yet.
> 
> 1. MapReduce behavior
> I read the source code of Cassandra 0.6.x and understood that
> jobtracker submits the map tasks to all Cassandra nodes, regardless of
> whether the target keyspace's data reside there. That is, if there are
> 1,000 nodes in the Cassandra cluster, jobtracker sends more than 1,000
> map tasks to all of the 1,000 nodes in parallel. If this is correct,
> I'm afraid the startup time of a MapReduce job gets longer as more
> nodes join the Cassandra cluster.
> Is this correct?
> With HBase, jobtracker submits map tasks only to the region servers
> that hold the target data. This behavior is desirable because no
> wasteful task submission is done. Can you suggest the cases where
> Cassandra+MapReduce is better than HBase+MapReduce for log/sensor
> analysis? (Please excuse me for my not presenting the analysis
> requirement).
> 
> 2. Data capacity
> The excerpt from the paper about Amazon Dynamo says that the cluster
> can scale to hundreds of nodes, not thousands. I understand Cassandra
> is similar. Assuming that the recent commodity servers have 2 to 4 TB
> of disks, we need about 1,000 nodes or more to store petabytes of
> data.
> Is the present Cassandra suitable for petabytes of data? If not, is
> any development in progress to increase the scalability?
> 
> 
> --------------------------------------------------
> Finally, Dynamo adopts a full membership model where each node is
> aware of the data hosted by its peers. To do this, each node actively
> gossips the full routing table with other nodes in the system. This
> model works well for a system that contains couple of hundreds of
> nodes. However, scaling such a design to run with tens of thousands of
> nodes is not trivial because the overhead in maintaining the routing
> table increases with the system size. This limitation might be
> overcome by introducing hierarchical extensions to Dynamo. Also, note
> that this problem is actively addressed by O(1) DHT systems(e.g.,
> [14]).
> --------------------------------------------------
> 
> Regards,
> Takayuki Tsunakawa
> 
>