You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Mark Schnitzius <ma...@cxense.com> on 2010/05/05 05:53:29 UTC

Updating (as opposed to just setting) Cassandra data via Hadoop

I have a situation where I need to accumulate values in Cassandra on an
ongoing basis.  Atomic increments are still in the works apparently (see
https://issues.apache.org/jira/browse/CASSANDRA-721) so for the time being
I'll be using Hadoop, and attempting to feed in both the existing values and
the new values to a M/R process where they can be combined together and
written back out to Cassandra.

The approach I'm taking is to use Hadoop's CombineFileInputFormat to blend
the existing data (using Cassandra's ColumnFamilyInputFormat) with the newly
incoming data (using something like Hadoop's SequenceFileInputFormat).

I was just wondering, has anyone here tried this, and were there issues?
 I'm worried because the CombineFileInputFormat has restrictions around
splits being from different pools so I don't know how this will play out
with data from both Cassandra and HDFS.  The other option, I suppose, is to
use a separate M/R process to replicate the data onto HDFS first, but I'd
rather avoid the extra step and duplication of storage.

Also, if you've tackled a similar situation in the past using a different
approach, I'd be keen to hear about it...


Thanks
Mark

Re: Updating (as opposed to just setting) Cassandra data via Hadoop

Posted by Jesse McConnell <je...@gmail.com>.
does anyone have a feel for how performant m/r operations are when
backed by cassandra as opposed to hdfs in terms of network utilization
and volume of data being pushed around?

jesse

--
jesse mcconnell
jesse.mcconnell@gmail.com



On Fri, May 7, 2010 at 08:54, Ian Kallen <sp...@gmail.com> wrote:
> On 5/6/10 3:26 PM, Stu Hood wrote:
>>
>> Ian: I think that as get_range_slice gets faster, the approach that Mark
>> was heading toward may be considerably more efficient than reading the old
>> value in the OutputFormat.
>>
>
> Interesting, I'm trying to understand the performance impact of the
> different ways to do this. Under Mark's approach, the prior values are
> pulled out of Cassandra in the mapper in bulk, then merged and written back
> to Cassandra in the reducer; the get_range_slice is faster than the
> individual row fetches that my approach does in the reducer. Is that what
> you mean or are you referring to something else?
> thanks!
> -Ian
>
> --
> Ian Kallen
> blog: http://www.arachna.com/roller/spidaman
> tweetz: http://twitter.com/spidaman
> vox: 925.385.8426
>
>
>

Re: Updating (as opposed to just setting) Cassandra data via Hadoop

Posted by Ian Kallen <sp...@gmail.com>.
On 5/6/10 3:26 PM, Stu Hood wrote:
> Ian: I think that as get_range_slice gets faster, the approach that Mark was heading toward may be considerably more efficient than reading the old value in the OutputFormat.
>    
Interesting, I'm trying to understand the performance impact of the 
different ways to do this. Under Mark's approach, the prior values are 
pulled out of Cassandra in the mapper in bulk, then merged and written 
back to Cassandra in the reducer; the get_range_slice is faster than the 
individual row fetches that my approach does in the reducer. Is that 
what you mean or are you referring to something else?
thanks!
-Ian

-- 
Ian Kallen
blog: http://www.arachna.com/roller/spidaman
tweetz: http://twitter.com/spidaman
vox: 925.385.8426



Re: Updating (as opposed to just setting) Cassandra data via Hadoop

Posted by Brandon Williams <dr...@gmail.com>.
+1 for pig

-Brandon

On May 6, 2010 6:49 PM, "Stu Hood" <st...@rackspace.com> wrote:

Agreed.

For the massaging, may I recommend using Pig? It is fantastic for unioning
and reformatting datasets like these.


-----Original Message-----
From: "Mark Schnitzius" <ma...@cxense.com>
Sent: Thursday, May...

Re: Updating (as opposed to just setting) Cassandra data via Hadoop

Posted by Stu Hood <st...@rackspace.com>.
Agreed.

For the massaging, may I recommend using Pig? It is fantastic for unioning and reformatting datasets like these.

-----Original Message-----
From: "Mark Schnitzius" <ma...@cxense.com>
Sent: Thursday, May 6, 2010 6:36pm
To: user@cassandra.apache.org
Subject: Re: Updating (as opposed to just setting) Cassandra data via Hadoop

Yes, I think this approach would be more efficient, but Ian's point about
failed runs is well taken.  It is still a problem with this approach.  I may
have to introduce a scheme where Hadoop's output is written to a new column
family, and then some sort of pointer is updated to point to this column
family at the end of a successful run.  That way, if a run failed, the
pointer would still point to the old data, and we could just rerun the
update task.  (Older and failed versions of the data would have to be
periodically purged as well.)

The other difficulty with this approach is massaging the existing Cassandra
data (which will come in as <String, SortedMap<byte[], IColumn>>) to look
like whatever the other input stream looks like, since a Hadoop mapper can
only take one key/value type.  Some sort of wrapper class should work, but
the input streams might even have different granularities (i.e. each entry
in the SortedMap<byte[], IColumn>> map might correspond to a single input
row from the other input stream).

I'll put something on the wiki if I can make it work...


Cheers
Mark


On Fri, May 7, 2010 at 8:26 AM, Stu Hood <st...@rackspace.com> wrote:

> Ian: I think that as get_range_slice gets faster, the approach that Mark
> was heading toward may be considerably more efficient than reading the old
> value in the OutputFormat.
>
> Mark: Reading all of the data you want to update out of Cassandra using the
> InputFormat, merging it with (tagged) new data, and then performing
> deletes/inserts (without reads) in a Cassandra OutputFormat makes a lot of
> sense to me. It gives you much better data locality: all reads happen in a
> streaming fashion (mostly: this is where we need improvement for
> get_range_slices) at the beginning of the job.
>
>
> -----Original Message-----
> From: "Ian Kallen" <sp...@gmail.com>
> Sent: Thursday, May 6, 2010 5:14pm
> To: user@cassandra.apache.org
> Subject: Re: Updating (as opposed to just setting) Cassandra data via
> Hadoop
>
> I have inputs that are text logs and I wrote a Cassandra OutputFormat, the
> reducers read the old values from their respective column families,
> increment the counts and write back the new values. Since all of the writes
> are done by the hadoop jobs and we're not running multiple jobs
> concurrently, the writes aren't clobbering any values except the ones from
> the prior hadoop run. If/when atomic increments are available, we'd be able
> to run concurrent log processing jobs for but for now, this seems to work.
> I
> think the biggest risk is that a reduce task fails, hadoop restarts it and
> the replacement task re-increments the values. We're going to wait and see
> to determine how much of a problem this is in practice.
> -Ian
>
> On Tue, May 4, 2010 at 8:53 PM, Mark Schnitzius
> <ma...@cxense.com>wrote:
>
> > I have a situation where I need to accumulate values in Cassandra on an
> > ongoing basis.  Atomic increments are still in the works apparently (see
> > https://issues.apache.org/jira/browse/CASSANDRA-721) so for the time
> being
> > I'll be using Hadoop, and attempting to feed in both the existing values
> and
> > the new values to a M/R process where they can be combined together and
> > written back out to Cassandra.
> >
> > The approach I'm taking is to use Hadoop's CombineFileInputFormat to
> blend
> > the existing data (using Cassandra's ColumnFamilyInputFormat) with the
> newly
> > incoming data (using something like Hadoop's SequenceFileInputFormat).
> >
> > I was just wondering, has anyone here tried this, and were there issues?
> >  I'm worried because the CombineFileInputFormat has restrictions around
> > splits being from different pools so I don't know how this will play out
> > with data from both Cassandra and HDFS.  The other option, I suppose, is
> to
> > use a separate M/R process to replicate the data onto HDFS first, but I'd
> > rather avoid the extra step and duplication of storage.
> >
> > Also, if you've tackled a similar situation in the past using a different
> > approach, I'd be keen to hear about it...
> >
> >
> > Thanks
> > Mark
> >
>
>
>



Re: Updating (as opposed to just setting) Cassandra data via Hadoop

Posted by Mark Schnitzius <ma...@cxense.com>.
Yes, I think this approach would be more efficient, but Ian's point about
failed runs is well taken.  It is still a problem with this approach.  I may
have to introduce a scheme where Hadoop's output is written to a new column
family, and then some sort of pointer is updated to point to this column
family at the end of a successful run.  That way, if a run failed, the
pointer would still point to the old data, and we could just rerun the
update task.  (Older and failed versions of the data would have to be
periodically purged as well.)

The other difficulty with this approach is massaging the existing Cassandra
data (which will come in as <String, SortedMap<byte[], IColumn>>) to look
like whatever the other input stream looks like, since a Hadoop mapper can
only take one key/value type.  Some sort of wrapper class should work, but
the input streams might even have different granularities (i.e. each entry
in the SortedMap<byte[], IColumn>> map might correspond to a single input
row from the other input stream).

I'll put something on the wiki if I can make it work...


Cheers
Mark


On Fri, May 7, 2010 at 8:26 AM, Stu Hood <st...@rackspace.com> wrote:

> Ian: I think that as get_range_slice gets faster, the approach that Mark
> was heading toward may be considerably more efficient than reading the old
> value in the OutputFormat.
>
> Mark: Reading all of the data you want to update out of Cassandra using the
> InputFormat, merging it with (tagged) new data, and then performing
> deletes/inserts (without reads) in a Cassandra OutputFormat makes a lot of
> sense to me. It gives you much better data locality: all reads happen in a
> streaming fashion (mostly: this is where we need improvement for
> get_range_slices) at the beginning of the job.
>
>
> -----Original Message-----
> From: "Ian Kallen" <sp...@gmail.com>
> Sent: Thursday, May 6, 2010 5:14pm
> To: user@cassandra.apache.org
> Subject: Re: Updating (as opposed to just setting) Cassandra data via
> Hadoop
>
> I have inputs that are text logs and I wrote a Cassandra OutputFormat, the
> reducers read the old values from their respective column families,
> increment the counts and write back the new values. Since all of the writes
> are done by the hadoop jobs and we're not running multiple jobs
> concurrently, the writes aren't clobbering any values except the ones from
> the prior hadoop run. If/when atomic increments are available, we'd be able
> to run concurrent log processing jobs for but for now, this seems to work.
> I
> think the biggest risk is that a reduce task fails, hadoop restarts it and
> the replacement task re-increments the values. We're going to wait and see
> to determine how much of a problem this is in practice.
> -Ian
>
> On Tue, May 4, 2010 at 8:53 PM, Mark Schnitzius
> <ma...@cxense.com>wrote:
>
> > I have a situation where I need to accumulate values in Cassandra on an
> > ongoing basis.  Atomic increments are still in the works apparently (see
> > https://issues.apache.org/jira/browse/CASSANDRA-721) so for the time
> being
> > I'll be using Hadoop, and attempting to feed in both the existing values
> and
> > the new values to a M/R process where they can be combined together and
> > written back out to Cassandra.
> >
> > The approach I'm taking is to use Hadoop's CombineFileInputFormat to
> blend
> > the existing data (using Cassandra's ColumnFamilyInputFormat) with the
> newly
> > incoming data (using something like Hadoop's SequenceFileInputFormat).
> >
> > I was just wondering, has anyone here tried this, and were there issues?
> >  I'm worried because the CombineFileInputFormat has restrictions around
> > splits being from different pools so I don't know how this will play out
> > with data from both Cassandra and HDFS.  The other option, I suppose, is
> to
> > use a separate M/R process to replicate the data onto HDFS first, but I'd
> > rather avoid the extra step and duplication of storage.
> >
> > Also, if you've tackled a similar situation in the past using a different
> > approach, I'd be keen to hear about it...
> >
> >
> > Thanks
> > Mark
> >
>
>
>

Re: Updating (as opposed to just setting) Cassandra data via Hadoop

Posted by Stu Hood <st...@rackspace.com>.
Ian: I think that as get_range_slice gets faster, the approach that Mark was heading toward may be considerably more efficient than reading the old value in the OutputFormat.

Mark: Reading all of the data you want to update out of Cassandra using the InputFormat, merging it with (tagged) new data, and then performing deletes/inserts (without reads) in a Cassandra OutputFormat makes a lot of sense to me. It gives you much better data locality: all reads happen in a streaming fashion (mostly: this is where we need improvement for get_range_slices) at the beginning of the job.


-----Original Message-----
From: "Ian Kallen" <sp...@gmail.com>
Sent: Thursday, May 6, 2010 5:14pm
To: user@cassandra.apache.org
Subject: Re: Updating (as opposed to just setting) Cassandra data via Hadoop

I have inputs that are text logs and I wrote a Cassandra OutputFormat, the
reducers read the old values from their respective column families,
increment the counts and write back the new values. Since all of the writes
are done by the hadoop jobs and we're not running multiple jobs
concurrently, the writes aren't clobbering any values except the ones from
the prior hadoop run. If/when atomic increments are available, we'd be able
to run concurrent log processing jobs for but for now, this seems to work. I
think the biggest risk is that a reduce task fails, hadoop restarts it and
the replacement task re-increments the values. We're going to wait and see
to determine how much of a problem this is in practice.
-Ian

On Tue, May 4, 2010 at 8:53 PM, Mark Schnitzius
<ma...@cxense.com>wrote:

> I have a situation where I need to accumulate values in Cassandra on an
> ongoing basis.  Atomic increments are still in the works apparently (see
> https://issues.apache.org/jira/browse/CASSANDRA-721) so for the time being
> I'll be using Hadoop, and attempting to feed in both the existing values and
> the new values to a M/R process where they can be combined together and
> written back out to Cassandra.
>
> The approach I'm taking is to use Hadoop's CombineFileInputFormat to blend
> the existing data (using Cassandra's ColumnFamilyInputFormat) with the newly
> incoming data (using something like Hadoop's SequenceFileInputFormat).
>
> I was just wondering, has anyone here tried this, and were there issues?
>  I'm worried because the CombineFileInputFormat has restrictions around
> splits being from different pools so I don't know how this will play out
> with data from both Cassandra and HDFS.  The other option, I suppose, is to
> use a separate M/R process to replicate the data onto HDFS first, but I'd
> rather avoid the extra step and duplication of storage.
>
> Also, if you've tackled a similar situation in the past using a different
> approach, I'd be keen to hear about it...
>
>
> Thanks
> Mark
>



Re: Updating (as opposed to just setting) Cassandra data via Hadoop

Posted by Ian Kallen <sp...@gmail.com>.
I have inputs that are text logs and I wrote a Cassandra OutputFormat, the
reducers read the old values from their respective column families,
increment the counts and write back the new values. Since all of the writes
are done by the hadoop jobs and we're not running multiple jobs
concurrently, the writes aren't clobbering any values except the ones from
the prior hadoop run. If/when atomic increments are available, we'd be able
to run concurrent log processing jobs for but for now, this seems to work. I
think the biggest risk is that a reduce task fails, hadoop restarts it and
the replacement task re-increments the values. We're going to wait and see
to determine how much of a problem this is in practice.
-Ian

On Tue, May 4, 2010 at 8:53 PM, Mark Schnitzius
<ma...@cxense.com>wrote:

> I have a situation where I need to accumulate values in Cassandra on an
> ongoing basis.  Atomic increments are still in the works apparently (see
> https://issues.apache.org/jira/browse/CASSANDRA-721) so for the time being
> I'll be using Hadoop, and attempting to feed in both the existing values and
> the new values to a M/R process where they can be combined together and
> written back out to Cassandra.
>
> The approach I'm taking is to use Hadoop's CombineFileInputFormat to blend
> the existing data (using Cassandra's ColumnFamilyInputFormat) with the newly
> incoming data (using something like Hadoop's SequenceFileInputFormat).
>
> I was just wondering, has anyone here tried this, and were there issues?
>  I'm worried because the CombineFileInputFormat has restrictions around
> splits being from different pools so I don't know how this will play out
> with data from both Cassandra and HDFS.  The other option, I suppose, is to
> use a separate M/R process to replicate the data onto HDFS first, but I'd
> rather avoid the extra step and duplication of storage.
>
> Also, if you've tackled a similar situation in the past using a different
> approach, I'd be keen to hear about it...
>
>
> Thanks
> Mark
>

Re: Updating (as opposed to just setting) Cassandra data via Hadoop

Posted by Jonathan Ellis <jb...@gmail.com>.
It sounds reasonable to me, with the caveat that I have only limited
Hadoop knowledge.

Please write up a blog post when you get it working. :)

On Wed, May 5, 2010 at 10:44 PM, Mark Schnitzius
<ma...@cxense.com> wrote:
> Apologies, Hadoop recently deprecated a whole bunch of classes and I
> misunderstood how the new ones work.
> What I'll be doing is creating an InputFormat class that
> uses ColumnFamilyInputFormat to get splits from the existing Cassandra data,
> and merges them with splits from a SequenceFileInputFormat.
> Is this a reasonable approach, or is there a better, more standard way to
> update Cassandra data with new Hadoop data?  It may just boil down to a
> design decision, but it would seem to me that this would be a problem that
> would've been encountered many times before...
>
> Thanks
> Mark
>
>
> On Thu, May 6, 2010 at 12:23 AM, Jonathan Ellis <jb...@gmail.com> wrote:
>>
>> I'm a little confused.  CombineFileInputFormat is designed to combine
>> multiple small input splits into one larger one.  It's not for merging
>> data (that needs to be part of the reduce phase).  Maybe I'm
>> misunderstanding what you're saying.
>>
>> On Tue, May 4, 2010 at 10:53 PM, Mark Schnitzius
>> <ma...@cxense.com> wrote:
>> > I have a situation where I need to accumulate values in Cassandra on an
>> > ongoing basis.  Atomic increments are still in the works apparently
>> > (see https://issues.apache.org/jira/browse/CASSANDRA-721) so for the
>> > time
>> > being I'll be using Hadoop, and attempting to feed in both the existing
>> > values and the new values to a M/R process where they can be combined
>> > together and written back out to Cassandra.
>> > The approach I'm taking is to use Hadoop's CombineFileInputFormat to
>> > blend
>> > the existing data (using Cassandra's ColumnFamilyInputFormat) with the
>> > newly
>> > incoming data (using something like Hadoop's SequenceFileInputFormat).
>> > I was just wondering, has anyone here tried this, and were there issues?
>> >  I'm worried because the CombineFileInputFormat has restrictions around
>> > splits being from different pools so I don't know how this will play out
>> > with data from both Cassandra and HDFS.  The other option, I suppose, is
>> > to
>> > use a separate M/R process to replicate the data onto HDFS first, but
>> > I'd
>> > rather avoid the extra step and duplication of storage.
>> > Also, if you've tackled a similar situation in the past using a
>> > different
>> > approach, I'd be keen to hear about it...
>> >
>> > Thanks
>> > Mark
>>
>>
>>
>> --
>> Jonathan Ellis
>> Project Chair, Apache Cassandra
>> co-founder of Riptano, the source for professional Cassandra support
>> http://riptano.com
>
>



-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of Riptano, the source for professional Cassandra support
http://riptano.com

Re: Updating (as opposed to just setting) Cassandra data via Hadoop

Posted by Mark Schnitzius <ma...@cxense.com>.
Apologies, Hadoop recently deprecated a whole bunch of classes and I
misunderstood how the new ones work.

What I'll be doing is creating an InputFormat class that
uses ColumnFamilyInputFormat to get splits from the existing Cassandra data,
and merges them with splits from a SequenceFileInputFormat.

Is this a reasonable approach, or is there a better, more standard way to
update Cassandra data with new Hadoop data?  It may just boil down to a
design decision, but it would seem to me that this would be a problem that
would've been encountered many times before...


Thanks
Mark


On Thu, May 6, 2010 at 12:23 AM, Jonathan Ellis <jb...@gmail.com> wrote:

> I'm a little confused.  CombineFileInputFormat is designed to combine
> multiple small input splits into one larger one.  It's not for merging
> data (that needs to be part of the reduce phase).  Maybe I'm
> misunderstanding what you're saying.
>
> On Tue, May 4, 2010 at 10:53 PM, Mark Schnitzius
> <ma...@cxense.com> wrote:
> > I have a situation where I need to accumulate values in Cassandra on an
> > ongoing basis.  Atomic increments are still in the works apparently
> > (see https://issues.apache.org/jira/browse/CASSANDRA-721) so for the
> time
> > being I'll be using Hadoop, and attempting to feed in both the existing
> > values and the new values to a M/R process where they can be combined
> > together and written back out to Cassandra.
> > The approach I'm taking is to use Hadoop's CombineFileInputFormat to
> blend
> > the existing data (using Cassandra's ColumnFamilyInputFormat) with the
> newly
> > incoming data (using something like Hadoop's SequenceFileInputFormat).
> > I was just wondering, has anyone here tried this, and were there issues?
> >  I'm worried because the CombineFileInputFormat has restrictions around
> > splits being from different pools so I don't know how this will play out
> > with data from both Cassandra and HDFS.  The other option, I suppose, is
> to
> > use a separate M/R process to replicate the data onto HDFS first, but I'd
> > rather avoid the extra step and duplication of storage.
> > Also, if you've tackled a similar situation in the past using a different
> > approach, I'd be keen to hear about it...
> >
> > Thanks
> > Mark
>
>
>
> --
> Jonathan Ellis
> Project Chair, Apache Cassandra
> co-founder of Riptano, the source for professional Cassandra support
> http://riptano.com
>

Re: Updating (as opposed to just setting) Cassandra data via Hadoop

Posted by Jonathan Ellis <jb...@gmail.com>.
I'm a little confused.  CombineFileInputFormat is designed to combine
multiple small input splits into one larger one.  It's not for merging
data (that needs to be part of the reduce phase).  Maybe I'm
misunderstanding what you're saying.

On Tue, May 4, 2010 at 10:53 PM, Mark Schnitzius
<ma...@cxense.com> wrote:
> I have a situation where I need to accumulate values in Cassandra on an
> ongoing basis.  Atomic increments are still in the works apparently
> (see https://issues.apache.org/jira/browse/CASSANDRA-721) so for the time
> being I'll be using Hadoop, and attempting to feed in both the existing
> values and the new values to a M/R process where they can be combined
> together and written back out to Cassandra.
> The approach I'm taking is to use Hadoop's CombineFileInputFormat to blend
> the existing data (using Cassandra's ColumnFamilyInputFormat) with the newly
> incoming data (using something like Hadoop's SequenceFileInputFormat).
> I was just wondering, has anyone here tried this, and were there issues?
>  I'm worried because the CombineFileInputFormat has restrictions around
> splits being from different pools so I don't know how this will play out
> with data from both Cassandra and HDFS.  The other option, I suppose, is to
> use a separate M/R process to replicate the data onto HDFS first, but I'd
> rather avoid the extra step and duplication of storage.
> Also, if you've tackled a similar situation in the past using a different
> approach, I'd be keen to hear about it...
>
> Thanks
> Mark



-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of Riptano, the source for professional Cassandra support
http://riptano.com