You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Patrik Modesto <pa...@gmail.com> on 2011/01/25 09:37:34 UTC

[mapreduce] ColumnFamilyRecordWriter hidden reuse

Hi,

I play with Cassandra 0.7.0 and Hadoop, developing simple MapReduce
tasks. While developing really simple MR task, I've found that a
combiantion of Hadoop optimalization and Cassandra
ColumnFamilyRecordWriter queue creates wrong keys to send to
batch_mutate(). The proble is in the reduce part, the storage behind
the key parameter is reused. For example when storing URL I'll get:

http://119.cz/index.php/vypalovaci-mechaniky-a-vypalovani-disk/120-jak-zjistit-verzi-firmwaru-vypalovaky-ve-windows-vista
 (1)
http://11superstars.xf.cz/index.php?page=12y-a-vypalovani-disk/120-jak-zjistit-verzi-firmwaru-vypalovaky-ve-windows-vista
 (2)
http://12kmenu.unas.cz/18-6-2011-(Isachar).htmlvypalovani-disk/120-jak-zjistit-verzi-firmwaru-vypalovaky-ve-windows-vista
 (3)

You can see, that part of the URL (1) is repeating in the URL (2) and URL (3).

I've changed the my reduce method to clone the key before calling the
context.write(), but I think it should be cloned inside the Cassandra
ColumnFamilyRecordWriter because I as a user I don't care about how is
it implemented inside, I just write values there. For example the
FileOutputFormat, I don't need to clone the key when writting to it.

I'd like to know what's your opinion.

Best regards,
Patrik

Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

Posted by Mck <mc...@apache.org>.
On Wed, 2011-01-26 at 12:13 +0100, Patrik Modesto wrote:
> BTW how to get current time in microseconds in Java?

I'm using HFactory.clock() (from hector).

> > As far as moving the clone(..) into ColumnFamilyRecordWriter.write(..)
> > won't this hurt performance? 
> 
> The size of the queue is computed at runtime:
> ColumnFamilyOutputFormat.QUEUE_SIZE, 32 *
> Runtime.getRuntime().availableProcessors()
> So the queue is not too large so I'd say the performance shouldn't get hurt. 

This is only the default.
I'm running w/ 80000. Testing have given this the best throughput for me
when processing 25+ million rows...

In the end it is still 25+ million .clone(..) calls. 

> The key isn't the only potential live byte[]. You also have names and
> values in all the columns (and supercolumns) for all the mutations.

Now make that over a billion .clone(..) calls... :-(

byte[] copies are relatively quick and cheap, still i am seeing a
performance degradation in m/r reduce performance with cloning of keys.
It's not that you don't have my vote here, i'm just stating my
uncertainty on what the correct API should be.

~mck

Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

Posted by Patrik Modesto <pa...@gmail.com>.
On Wed, Jan 26, 2011 at 08:58, Mck <mc...@apache.org> wrote:
>> You are correct that microseconds would be better but for the test it
>> doesn't matter that much.
>
> Have you tried. I'm very new to cassandra as well, and always uncertain
> as to what to expect...

IMHO it's matter of use-case. In my use-case there is no possibility
for two (or more) processes to write/update the same key so
miliseconds are fine for me.

BTW how to get current time in microseconds in Java?

> As far as moving the clone(..) into ColumnFamilyRecordWriter.write(..)
> won't this hurt performance? Normally i would _always_ agree that a
> defensive copy of an array/collection argument be stored, but has this
> intentionally not been done (or should it) because of large reduce jobs
> (millions of records) and the performance impact here.

The size of the queue is computed at runtime:
ColumnFamilyOutputFormat.QUEUE_SIZE, 32 *
Runtime.getRuntime().availableProcessors()
So the queue is not too large so I'd say the performance shouldn't get hurt.

 --
Patrik

Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

Posted by Mck <mc...@apache.org>.
> >  is "d.timestamp = System.currentTimeMillis();" ok?
> 
> You are correct that microseconds would be better but for the test it
> doesn't matter that much. 

Have you tried. I'm very new to cassandra as well, and always uncertain
as to what to expect...


> ByteBuffer bbKey = ByteBufferUtil.clone(ByteBuffer.wrap(key.getBytes(), 0, key.getLength())); 

An alternative approach to your client-side cloning is 

  ByteBuffer bbKey = ByteBuffer.wrap(key.toString().getBytes(UTF_8)); 

Here at least it is obvious you are passing in the bytes from an immutable object.

As far as moving the clone(..) into ColumnFamilyRecordWriter.write(..)
won't this hurt performance? Normally i would _always_ agree that a
defensive copy of an array/collection argument be stored, but has this
intentionally not been done (or should it) because of large reduce jobs
(millions of records) and the performance impact here.

The key isn't the only potential live byte[]. You also have names and
values in all the columns (and supercolumns) for all the mutations.


~mck


Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

Posted by Patrik Modesto <pa...@gmail.com>.
On Tue, Jan 25, 2011 at 19:09, Mick Semb Wever <mc...@apache.org> wrote:

> In fact i have another problem (trying to write an empty byte[], or
> something, as a key, which put one whole row out of whack, ((one row in
> 25 million...))).
>
> But i'm debugging along the same code.
>
> I don't quite understand how the byte[] in
> ByteBuffer.wrap(key.getBytes(),...)
> gets clobbered.

Code snippet would help here.

> Well your key is a mutable Text object, so i can see some possibility
> depending on how hadoop uses these objects.
> Is there something to ByteBuffer.allocate(..) i'm missing...

I don't know, I'm quite new to Java (but with long C++ history).

> btw.
>  is "d.timestamp = System.currentTimeMillis();" ok?
>  shouldn't this be microseconds so that each mutation has a different
> timestamp? http://wiki.apache.org/cassandra/DataModel

You are correct that microseconds would be better but for the test it
doesn't matter that much.

Patrik

Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

Posted by Jonathan Ellis <jb...@gmail.com>.
On Tue, Jan 25, 2011 at 12:09 PM, Mick Semb Wever <mc...@apache.org> wrote:
> Well your key is a mutable Text object, so i can see some possibility
> depending on how hadoop uses these objects.

Yes, that's it exactly.  We recently fixed a bug in the demo
word_count program for this. Now we do
ByteBuffer.wrap(Arrays.copyOf(text.getBytes(), text.getLength())).

-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of DataStax, the source for professional Cassandra support
http://www.datastax.com

Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

Posted by Mick Semb Wever <mc...@apache.org>.
On Tue, 2011-01-25 at 14:16 +0100, Patrik Modesto wrote:
> The atttached file contains the working version with cloned key in
> reduce() method. My other aproache was:
> 
> > context.write(ByteBuffer.wrap(key.getBytes(), 0, key.getLength()),
> > Collections.singletonList(getMutation(key)));
> 
> Which produce junk keys. 

In fact i have another problem (trying to write an empty byte[], or
something, as a key, which put one whole row out of whack, ((one row in
25 million...))).

But i'm debugging along the same code.

I don't quite understand how the byte[] in 
ByteBuffer.wrap(key.getBytes(),...)
gets clobbered.
Well your key is a mutable Text object, so i can see some possibility
depending on how hadoop uses these objects.
Is there something to ByteBuffer.allocate(..) i'm missing...

btw.
 is "d.timestamp = System.currentTimeMillis();" ok?
 shouldn't this be microseconds so that each mutation has a different
timestamp? http://wiki.apache.org/cassandra/DataModel


~mck


-- 
"As you go the way of life, you will see a great chasm. Jump. It is not
as wide as you think." Native American Initiation Rite 
| http://semb.wever.org | http://sesat.no
| http://finn.no       | Java XSS Filter

-- 
"Everything should be made as simple as possible, but not simpler."
Albert Einstein (William of Ockham) 
| http://semb.wever.org | http://sesat.no
| http://finn.no       | Java XSS Filter

Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

Posted by Patrik Modesto <pa...@gmail.com>.
Hi Mick,

attached is the very simple MR job, that deletes expired URL from my
test Cassandra DB. The keyspace looks like this:

Keyspace: Test:
  Replication Strategy: org.apache.cassandra.locator.SimpleStrategy
    Replication Factor: 2
  Column Families:
    ColumnFamily: Url2
      Columns sorted by: org.apache.cassandra.db.marshal.UTF8Type
      Row cache size / save period: 0.0/0
      Key cache size / save period: 200000.0/3600
      Memtable thresholds: 4.7015625/1003/60
      GC grace seconds: 864000
      Compaction min/max thresholds: 4/32
      Read repair chance: 1.0
      Built indexes: []

In the CF the key is URL and inside there are some data. My MR job
needs just "expire_date" which is int64 timestamp. For now I store it
as a string because I use Python and C++ to manipulate the data as
well.

For the MR Job to run you need a patch I did. You can find it here:
https://issues.apache.org/jira/browse/CASSANDRA-2014

The atttached file contains the working version with cloned key in
reduce() method. My other aproache was:
[code]
context.write(ByteBuffer.wrap(key.getBytes(), 0, key.getLength()),
Collections.singletonList(getMutation(key)));
[/code]
Which produce junk keys.

Best regards,
Patrik

Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

Posted by Mick Semb Wever <mc...@apache.org>.
On Tue, 2011-01-25 at 09:37 +0100, Patrik Modesto wrote:
> While developing really simple MR task, I've found that a
> combiantion of Hadoop optimalization and Cassandra
> ColumnFamilyRecordWriter queue creates wrong keys to send to
> batch_mutate(). 

I've seen similar behaviour (junk rows being written), although my keys
are always a result from
  LongSerializer.get().toByteBuffer(key)


i'm interested in looking into it - but can you provide a code example? 

  From what i can see TextOutputFormat.LineRecordWriter.write(..)
doesn't clone anything, but it does write it out immediately.
  While ColumnFamilyRecordWriter does batch the mutations up as you say,
it takes a ByteBuffer as a key, why/how are you re-using this
client-side (arn't you creating a new ByteBuffer each call to
write(..))?

~mck

-- 
"Never let your sense of morals get in the way of doing what's right."
Isaac Asimov 
| http://semb.wever.org | http://sesat.no
| http://finn.no       | Java XSS Filter