You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by William Oberman <ob...@civicscience.com> on 2014/04/04 20:13:19 UTC

Fwd: using hadoop + cassandra for CF mutations (delete)

Apologies for cross posting!

My core issue is unblocked, but I'm still curious on one aspect of my
question to the cassandra mailing list.  How does Pig/Hadoop decide how
many tasks there are?  The forwarded email below has the gory details, but
basically:
-My Pig loadFunc was CassandraStorage
-The "table" (column family in cassandra) has something like a billion rows
in it, and I want to say ~3TB of data.
-No matter what I tried(*), Pig/Hadoop decided this was worthy of 20 tasks

(*) I changed settings in the loadFunc, I booted hadoop clusters with more
or less task slots, etc...

I'm using AWS's EMR, which claims to be hadoop 1.0.3 + pig 11.

will

---------- Forwarded message ----------
From: William Oberman <ob...@civicscience.com>
Date: Fri, Apr 4, 2014 at 12:24 PM
Subject: using hadoop + cassandra for CF mutations (delete)
To: "user@cassandra.apache.org" <us...@cassandra.apache.org>


Hi,

I have some history with cassandra + hadoop:
1.) Single DC + integrated hadoop = Was "ok" until I needed steady
performance (the single DC was used in a production environment)
2.) Two DC's + integrated hadoop on 1 of 2 DCs = Was "ok" until my data
grew and in AWS compute is expensive compared to data storage... e.g.
running a 24x7 DC was a lot more expensive than the following solution...
3.) Single DC + a constant "ETL" to S3 = Is still ok, I can spawn an
"arbitrarily large" EMR cluster.  And 24x7 data storage + transient EMR is
cost effective.

But, one of my CF's has had a change of usage pattern making a large %, but
not all of the data, fairly pointless to store.  I thought I'd write a Pig
UDF that could peek at a row of data and delete if it fails my criteria.
 And it "works" in terms of logic, but not in terms of practical execution.
 The CF in question has O(billion) keys, and afterwards it will have ~10%
of that at most.

I basically keep losing the jobs due to too many task failures, all rooted
in:
Caused by: TimedOutException()
at
org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:13020)

And yes, I've messed around with:
-Number of failures for map/reduce/tracker (in the hadoop confs)
-split_size (on the URL)
-cassandra.range.batch.size

But it hasn't helped.  My failsafe is to roll my own distributed process,
rather than falling into a pit of internal hadoop settings.  But I feel
like I'm close.

The problem in my opinion, watching how things are going, is the
correlation of splits <-> tasks.  I'm obviously using Pig, so this part of
the process is fairly opaque to me at the moment.  But, "something
somewhere" is picking 20 tasks for my job, and this is fairly independent
of the # of task slots (I've booted EMR cluster with different #'s and
always get 20).  Why does this matter?  When a task fails, it retries from
the start, which is a killer for me as I "delete as I go", making that
pointless work and massively increasing the odds of an overall job failure.
 If hadoop/pig chose a large number of tasks, the retries would be much
less of a burden.  But, I don't see where/what lets me mess with that logic.

Pig gives the ability to mess with reducers (PARALLEL), but I'm in the load
path, which is all mappers.  I've never jumped to the lower, raw hadoop
level before.  But, I'm worried that will be the "falling into a pit"
issue...

I'm using Cassandra 1.2.15.

will

Re: using hadoop + cassandra for CF mutations (delete)

Posted by William Oberman <ob...@civicscience.com>.
That's an excellent setting to know about!  But, I believe it depends on
the InputFormat implementing CombineFileInputFormat, and Cassandra does not.

Very cool though, thanks.


On Sun, Apr 6, 2014 at 12:43 AM, Dotan Patrich <do...@fortscale.com> wrote:

> Hi Will,
>
> Have you tried setting this directive at the top of you pig file:
> SET pig.maxCombinedSplitSize <some split size>
>
> It works for my on CDH 4.4, although my data source is HDFS files and not
> Casandra
>
> Regards,
> Dotan
>
>
>
>
> On Fri, Apr 4, 2014 at 9:13 PM, William Oberman <oberman@civicscience.com
> >wrote:
>
> > Apologies for cross posting!
> >
> > My core issue is unblocked, but I'm still curious on one aspect of my
> > question to the cassandra mailing list.  How does Pig/Hadoop decide how
> > many tasks there are?  The forwarded email below has the gory details,
> but
> > basically:
> > -My Pig loadFunc was CassandraStorage
> > -The "table" (column family in cassandra) has something like a billion
> rows
> > in it, and I want to say ~3TB of data.
> > -No matter what I tried(*), Pig/Hadoop decided this was worthy of 20
> tasks
> >
> > (*) I changed settings in the loadFunc, I booted hadoop clusters with
> more
> > or less task slots, etc...
> >
> > I'm using AWS's EMR, which claims to be hadoop 1.0.3 + pig 11.
> >
> > will
> >
> > ---------- Forwarded message ----------
> > From: William Oberman <ob...@civicscience.com>
> > Date: Fri, Apr 4, 2014 at 12:24 PM
> > Subject: using hadoop + cassandra for CF mutations (delete)
> > To: "user@cassandra.apache.org" <us...@cassandra.apache.org>
> >
> >
> > Hi,
> >
> > I have some history with cassandra + hadoop:
> > 1.) Single DC + integrated hadoop = Was "ok" until I needed steady
> > performance (the single DC was used in a production environment)
> > 2.) Two DC's + integrated hadoop on 1 of 2 DCs = Was "ok" until my data
> > grew and in AWS compute is expensive compared to data storage... e.g.
> > running a 24x7 DC was a lot more expensive than the following solution...
> > 3.) Single DC + a constant "ETL" to S3 = Is still ok, I can spawn an
> > "arbitrarily large" EMR cluster.  And 24x7 data storage + transient EMR
> is
> > cost effective.
> >
> > But, one of my CF's has had a change of usage pattern making a large %,
> but
> > not all of the data, fairly pointless to store.  I thought I'd write a
> Pig
> > UDF that could peek at a row of data and delete if it fails my criteria.
> >  And it "works" in terms of logic, but not in terms of practical
> execution.
> >  The CF in question has O(billion) keys, and afterwards it will have ~10%
> > of that at most.
> >
> > I basically keep losing the jobs due to too many task failures, all
> rooted
> > in:
> > Caused by: TimedOutException()
> > at
> >
> >
> org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:13020)
> >
> > And yes, I've messed around with:
> > -Number of failures for map/reduce/tracker (in the hadoop confs)
> > -split_size (on the URL)
> > -cassandra.range.batch.size
> >
> > But it hasn't helped.  My failsafe is to roll my own distributed process,
> > rather than falling into a pit of internal hadoop settings.  But I feel
> > like I'm close.
> >
> > The problem in my opinion, watching how things are going, is the
> > correlation of splits <-> tasks.  I'm obviously using Pig, so this part
> of
> > the process is fairly opaque to me at the moment.  But, "something
> > somewhere" is picking 20 tasks for my job, and this is fairly independent
> > of the # of task slots (I've booted EMR cluster with different #'s and
> > always get 20).  Why does this matter?  When a task fails, it retries
> from
> > the start, which is a killer for me as I "delete as I go", making that
> > pointless work and massively increasing the odds of an overall job
> failure.
> >  If hadoop/pig chose a large number of tasks, the retries would be much
> > less of a burden.  But, I don't see where/what lets me mess with that
> > logic.
> >
> > Pig gives the ability to mess with reducers (PARALLEL), but I'm in the
> load
> > path, which is all mappers.  I've never jumped to the lower, raw hadoop
> > level before.  But, I'm worried that will be the "falling into a pit"
> > issue...
> >
> > I'm using Cassandra 1.2.15.
> >
> > will
> >
>

Re: using hadoop + cassandra for CF mutations (delete)

Posted by Dotan Patrich <do...@fortscale.com>.
Hi Will,

Have you tried setting this directive at the top of you pig file:
SET pig.maxCombinedSplitSize <some split size>

It works for my on CDH 4.4, although my data source is HDFS files and not
Casandra

Regards,
Dotan




On Fri, Apr 4, 2014 at 9:13 PM, William Oberman <ob...@civicscience.com>wrote:

> Apologies for cross posting!
>
> My core issue is unblocked, but I'm still curious on one aspect of my
> question to the cassandra mailing list.  How does Pig/Hadoop decide how
> many tasks there are?  The forwarded email below has the gory details, but
> basically:
> -My Pig loadFunc was CassandraStorage
> -The "table" (column family in cassandra) has something like a billion rows
> in it, and I want to say ~3TB of data.
> -No matter what I tried(*), Pig/Hadoop decided this was worthy of 20 tasks
>
> (*) I changed settings in the loadFunc, I booted hadoop clusters with more
> or less task slots, etc...
>
> I'm using AWS's EMR, which claims to be hadoop 1.0.3 + pig 11.
>
> will
>
> ---------- Forwarded message ----------
> From: William Oberman <ob...@civicscience.com>
> Date: Fri, Apr 4, 2014 at 12:24 PM
> Subject: using hadoop + cassandra for CF mutations (delete)
> To: "user@cassandra.apache.org" <us...@cassandra.apache.org>
>
>
> Hi,
>
> I have some history with cassandra + hadoop:
> 1.) Single DC + integrated hadoop = Was "ok" until I needed steady
> performance (the single DC was used in a production environment)
> 2.) Two DC's + integrated hadoop on 1 of 2 DCs = Was "ok" until my data
> grew and in AWS compute is expensive compared to data storage... e.g.
> running a 24x7 DC was a lot more expensive than the following solution...
> 3.) Single DC + a constant "ETL" to S3 = Is still ok, I can spawn an
> "arbitrarily large" EMR cluster.  And 24x7 data storage + transient EMR is
> cost effective.
>
> But, one of my CF's has had a change of usage pattern making a large %, but
> not all of the data, fairly pointless to store.  I thought I'd write a Pig
> UDF that could peek at a row of data and delete if it fails my criteria.
>  And it "works" in terms of logic, but not in terms of practical execution.
>  The CF in question has O(billion) keys, and afterwards it will have ~10%
> of that at most.
>
> I basically keep losing the jobs due to too many task failures, all rooted
> in:
> Caused by: TimedOutException()
> at
>
> org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:13020)
>
> And yes, I've messed around with:
> -Number of failures for map/reduce/tracker (in the hadoop confs)
> -split_size (on the URL)
> -cassandra.range.batch.size
>
> But it hasn't helped.  My failsafe is to roll my own distributed process,
> rather than falling into a pit of internal hadoop settings.  But I feel
> like I'm close.
>
> The problem in my opinion, watching how things are going, is the
> correlation of splits <-> tasks.  I'm obviously using Pig, so this part of
> the process is fairly opaque to me at the moment.  But, "something
> somewhere" is picking 20 tasks for my job, and this is fairly independent
> of the # of task slots (I've booted EMR cluster with different #'s and
> always get 20).  Why does this matter?  When a task fails, it retries from
> the start, which is a killer for me as I "delete as I go", making that
> pointless work and massively increasing the odds of an overall job failure.
>  If hadoop/pig chose a large number of tasks, the retries would be much
> less of a burden.  But, I don't see where/what lets me mess with that
> logic.
>
> Pig gives the ability to mess with reducers (PARALLEL), but I'm in the load
> path, which is all mappers.  I've never jumped to the lower, raw hadoop
> level before.  But, I'm worried that will be the "falling into a pit"
> issue...
>
> I'm using Cassandra 1.2.15.
>
> will
>

Re: Fwd: using hadoop + cassandra for CF mutations (delete)

Posted by Suraj Nayak <sn...@gmail.com>.
Good way of experimenting Will. Share your observation :)

Adding cassandra user group for the input of the community on num_tokens
settings in cassandra.yaml.

Thanks
Suraj
On 07-Apr-2014 6:20 PM, "William Oberman" <ob...@civicscience.com> wrote:

> If that works, it's a neat/fancy trick.  But, after looking into the docs
> for that setting, related to vnodes which I'm not using, I'm not
> comfortable messing with it on my production cluster.
>
> I guess this implies that the # of mappers is related to the number of
> physical/virtual nodes of the cassandra cluster, which makes sense...
>
> I think I'm just going to hack up a script that I can run locally on a
> cassandra box, and only run queries/writes/deletes against the local token
> range (and get parallelism from cassandra's distributed nature).
>
> will
>
>
> On Sun, Apr 6, 2014 at 7:30 AM, Suraj Nayak <sn...@gmail.com> wrote:
>
>> Hi Will,
>>
>> Try changing the value of num_tokens in conf/cassandra.yaml.
>>
>> Set to your desired value minus 1.
>>
>> Example, if you want 5 map tasks to run, set the value of num_tokens to 4
>> (default is 256)
>>
>> I encountered almost same situation when I was trying to load or write
>> very small data from/into Cassandra. It was launching 257 map tasks.  When
>> num_tokens value reduced to 1 it Pig launched only 2 job. Do restart
>> Cassandra service after change.
>>
>> Hope it might help..
>>
>> --
>> Suraj
>> On 04-Apr-2014 11:44 PM, "William Oberman" <ob...@civicscience.com>
>> wrote:
>>
>>> Apologies for cross posting!
>>>
>>> My core issue is unblocked, but I'm still curious on one aspect of my
>>> question to the cassandra mailing list.  How does Pig/Hadoop decide how
>>> many tasks there are?  The forwarded email below has the gory details,
>>> but
>>> basically:
>>> -My Pig loadFunc was CassandraStorage
>>> -The "table" (column family in cassandra) has something like a billion
>>> rows
>>> in it, and I want to say ~3TB of data.
>>> -No matter what I tried(*), Pig/Hadoop decided this was worthy of 20
>>> tasks
>>>
>>> (*) I changed settings in the loadFunc, I booted hadoop clusters with
>>> more
>>> or less task slots, etc...
>>>
>>> I'm using AWS's EMR, which claims to be hadoop 1.0.3 + pig 11.
>>>
>>> will
>>>
>>> ---------- Forwarded message ----------
>>> From: William Oberman <ob...@civicscience.com>
>>> Date: Fri, Apr 4, 2014 at 12:24 PM
>>> Subject: using hadoop + cassandra for CF mutations (delete)
>>> To: "user@cassandra.apache.org" <us...@cassandra.apache.org>
>>>
>>>
>>> Hi,
>>>
>>> I have some history with cassandra + hadoop:
>>> 1.) Single DC + integrated hadoop = Was "ok" until I needed steady
>>> performance (the single DC was used in a production environment)
>>> 2.) Two DC's + integrated hadoop on 1 of 2 DCs = Was "ok" until my data
>>> grew and in AWS compute is expensive compared to data storage... e.g.
>>> running a 24x7 DC was a lot more expensive than the following solution...
>>> 3.) Single DC + a constant "ETL" to S3 = Is still ok, I can spawn an
>>> "arbitrarily large" EMR cluster.  And 24x7 data storage + transient EMR
>>> is
>>> cost effective.
>>>
>>> But, one of my CF's has had a change of usage pattern making a large %,
>>> but
>>> not all of the data, fairly pointless to store.  I thought I'd write a
>>> Pig
>>> UDF that could peek at a row of data and delete if it fails my criteria.
>>>  And it "works" in terms of logic, but not in terms of practical
>>> execution.
>>>  The CF in question has O(billion) keys, and afterwards it will have ~10%
>>> of that at most.
>>>
>>> I basically keep losing the jobs due to too many task failures, all
>>> rooted
>>> in:
>>> Caused by: TimedOutException()
>>> at
>>>
>>> org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:13020)
>>>
>>> And yes, I've messed around with:
>>> -Number of failures for map/reduce/tracker (in the hadoop confs)
>>> -split_size (on the URL)
>>> -cassandra.range.batch.size
>>>
>>> But it hasn't helped.  My failsafe is to roll my own distributed process,
>>> rather than falling into a pit of internal hadoop settings.  But I feel
>>> like I'm close.
>>>
>>> The problem in my opinion, watching how things are going, is the
>>> correlation of splits <-> tasks.  I'm obviously using Pig, so this part
>>> of
>>> the process is fairly opaque to me at the moment.  But, "something
>>> somewhere" is picking 20 tasks for my job, and this is fairly independent
>>> of the # of task slots (I've booted EMR cluster with different #'s and
>>> always get 20).  Why does this matter?  When a task fails, it retries
>>> from
>>> the start, which is a killer for me as I "delete as I go", making that
>>> pointless work and massively increasing the odds of an overall job
>>> failure.
>>>  If hadoop/pig chose a large number of tasks, the retries would be much
>>> less of a burden.  But, I don't see where/what lets me mess with that
>>> logic.
>>>
>>> Pig gives the ability to mess with reducers (PARALLEL), but I'm in the
>>> load
>>> path, which is all mappers.  I've never jumped to the lower, raw hadoop
>>> level before.  But, I'm worried that will be the "falling into a pit"
>>> issue...
>>>
>>> I'm using Cassandra 1.2.15.
>>>
>>> will
>>>
>>
>
>
>

Re: Fwd: using hadoop + cassandra for CF mutations (delete)

Posted by Suraj Nayak <sn...@gmail.com>.
Good way of experimenting Will. Share your observation :)

Adding cassandra user group for the input of the community on num_tokens
settings in cassandra.yaml.

Thanks
Suraj
On 07-Apr-2014 6:20 PM, "William Oberman" <ob...@civicscience.com> wrote:

> If that works, it's a neat/fancy trick.  But, after looking into the docs
> for that setting, related to vnodes which I'm not using, I'm not
> comfortable messing with it on my production cluster.
>
> I guess this implies that the # of mappers is related to the number of
> physical/virtual nodes of the cassandra cluster, which makes sense...
>
> I think I'm just going to hack up a script that I can run locally on a
> cassandra box, and only run queries/writes/deletes against the local token
> range (and get parallelism from cassandra's distributed nature).
>
> will
>
>
> On Sun, Apr 6, 2014 at 7:30 AM, Suraj Nayak <sn...@gmail.com> wrote:
>
>> Hi Will,
>>
>> Try changing the value of num_tokens in conf/cassandra.yaml.
>>
>> Set to your desired value minus 1.
>>
>> Example, if you want 5 map tasks to run, set the value of num_tokens to 4
>> (default is 256)
>>
>> I encountered almost same situation when I was trying to load or write
>> very small data from/into Cassandra. It was launching 257 map tasks.  When
>> num_tokens value reduced to 1 it Pig launched only 2 job. Do restart
>> Cassandra service after change.
>>
>> Hope it might help..
>>
>> --
>> Suraj
>> On 04-Apr-2014 11:44 PM, "William Oberman" <ob...@civicscience.com>
>> wrote:
>>
>>> Apologies for cross posting!
>>>
>>> My core issue is unblocked, but I'm still curious on one aspect of my
>>> question to the cassandra mailing list.  How does Pig/Hadoop decide how
>>> many tasks there are?  The forwarded email below has the gory details,
>>> but
>>> basically:
>>> -My Pig loadFunc was CassandraStorage
>>> -The "table" (column family in cassandra) has something like a billion
>>> rows
>>> in it, and I want to say ~3TB of data.
>>> -No matter what I tried(*), Pig/Hadoop decided this was worthy of 20
>>> tasks
>>>
>>> (*) I changed settings in the loadFunc, I booted hadoop clusters with
>>> more
>>> or less task slots, etc...
>>>
>>> I'm using AWS's EMR, which claims to be hadoop 1.0.3 + pig 11.
>>>
>>> will
>>>
>>> ---------- Forwarded message ----------
>>> From: William Oberman <ob...@civicscience.com>
>>> Date: Fri, Apr 4, 2014 at 12:24 PM
>>> Subject: using hadoop + cassandra for CF mutations (delete)
>>> To: "user@cassandra.apache.org" <us...@cassandra.apache.org>
>>>
>>>
>>> Hi,
>>>
>>> I have some history with cassandra + hadoop:
>>> 1.) Single DC + integrated hadoop = Was "ok" until I needed steady
>>> performance (the single DC was used in a production environment)
>>> 2.) Two DC's + integrated hadoop on 1 of 2 DCs = Was "ok" until my data
>>> grew and in AWS compute is expensive compared to data storage... e.g.
>>> running a 24x7 DC was a lot more expensive than the following solution...
>>> 3.) Single DC + a constant "ETL" to S3 = Is still ok, I can spawn an
>>> "arbitrarily large" EMR cluster.  And 24x7 data storage + transient EMR
>>> is
>>> cost effective.
>>>
>>> But, one of my CF's has had a change of usage pattern making a large %,
>>> but
>>> not all of the data, fairly pointless to store.  I thought I'd write a
>>> Pig
>>> UDF that could peek at a row of data and delete if it fails my criteria.
>>>  And it "works" in terms of logic, but not in terms of practical
>>> execution.
>>>  The CF in question has O(billion) keys, and afterwards it will have ~10%
>>> of that at most.
>>>
>>> I basically keep losing the jobs due to too many task failures, all
>>> rooted
>>> in:
>>> Caused by: TimedOutException()
>>> at
>>>
>>> org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:13020)
>>>
>>> And yes, I've messed around with:
>>> -Number of failures for map/reduce/tracker (in the hadoop confs)
>>> -split_size (on the URL)
>>> -cassandra.range.batch.size
>>>
>>> But it hasn't helped.  My failsafe is to roll my own distributed process,
>>> rather than falling into a pit of internal hadoop settings.  But I feel
>>> like I'm close.
>>>
>>> The problem in my opinion, watching how things are going, is the
>>> correlation of splits <-> tasks.  I'm obviously using Pig, so this part
>>> of
>>> the process is fairly opaque to me at the moment.  But, "something
>>> somewhere" is picking 20 tasks for my job, and this is fairly independent
>>> of the # of task slots (I've booted EMR cluster with different #'s and
>>> always get 20).  Why does this matter?  When a task fails, it retries
>>> from
>>> the start, which is a killer for me as I "delete as I go", making that
>>> pointless work and massively increasing the odds of an overall job
>>> failure.
>>>  If hadoop/pig chose a large number of tasks, the retries would be much
>>> less of a burden.  But, I don't see where/what lets me mess with that
>>> logic.
>>>
>>> Pig gives the ability to mess with reducers (PARALLEL), but I'm in the
>>> load
>>> path, which is all mappers.  I've never jumped to the lower, raw hadoop
>>> level before.  But, I'm worried that will be the "falling into a pit"
>>> issue...
>>>
>>> I'm using Cassandra 1.2.15.
>>>
>>> will
>>>
>>
>
>
>

Re: Fwd: using hadoop + cassandra for CF mutations (delete)

Posted by William Oberman <ob...@civicscience.com>.
If that works, it's a neat/fancy trick.  But, after looking into the docs
for that setting, related to vnodes which I'm not using, I'm not
comfortable messing with it on my production cluster.

I guess this implies that the # of mappers is related to the number of
physical/virtual nodes of the cassandra cluster, which makes sense...

I think I'm just going to hack up a script that I can run locally on a
cassandra box, and only run queries/writes/deletes against the local token
range (and get parallelism from cassandra's distributed nature).

will


On Sun, Apr 6, 2014 at 7:30 AM, Suraj Nayak <sn...@gmail.com> wrote:

> Hi Will,
>
> Try changing the value of num_tokens in conf/cassandra.yaml.
>
> Set to your desired value minus 1.
>
> Example, if you want 5 map tasks to run, set the value of num_tokens to 4
> (default is 256)
>
> I encountered almost same situation when I was trying to load or write
> very small data from/into Cassandra. It was launching 257 map tasks.  When
> num_tokens value reduced to 1 it Pig launched only 2 job. Do restart
> Cassandra service after change.
>
> Hope it might help..
>
> --
> Suraj
> On 04-Apr-2014 11:44 PM, "William Oberman" <ob...@civicscience.com>
> wrote:
>
>> Apologies for cross posting!
>>
>> My core issue is unblocked, but I'm still curious on one aspect of my
>> question to the cassandra mailing list.  How does Pig/Hadoop decide how
>> many tasks there are?  The forwarded email below has the gory details, but
>> basically:
>> -My Pig loadFunc was CassandraStorage
>> -The "table" (column family in cassandra) has something like a billion
>> rows
>> in it, and I want to say ~3TB of data.
>> -No matter what I tried(*), Pig/Hadoop decided this was worthy of 20 tasks
>>
>> (*) I changed settings in the loadFunc, I booted hadoop clusters with more
>> or less task slots, etc...
>>
>> I'm using AWS's EMR, which claims to be hadoop 1.0.3 + pig 11.
>>
>> will
>>
>> ---------- Forwarded message ----------
>> From: William Oberman <ob...@civicscience.com>
>> Date: Fri, Apr 4, 2014 at 12:24 PM
>> Subject: using hadoop + cassandra for CF mutations (delete)
>> To: "user@cassandra.apache.org" <us...@cassandra.apache.org>
>>
>>
>> Hi,
>>
>> I have some history with cassandra + hadoop:
>> 1.) Single DC + integrated hadoop = Was "ok" until I needed steady
>> performance (the single DC was used in a production environment)
>> 2.) Two DC's + integrated hadoop on 1 of 2 DCs = Was "ok" until my data
>> grew and in AWS compute is expensive compared to data storage... e.g.
>> running a 24x7 DC was a lot more expensive than the following solution...
>> 3.) Single DC + a constant "ETL" to S3 = Is still ok, I can spawn an
>> "arbitrarily large" EMR cluster.  And 24x7 data storage + transient EMR is
>> cost effective.
>>
>> But, one of my CF's has had a change of usage pattern making a large %,
>> but
>> not all of the data, fairly pointless to store.  I thought I'd write a Pig
>> UDF that could peek at a row of data and delete if it fails my criteria.
>>  And it "works" in terms of logic, but not in terms of practical
>> execution.
>>  The CF in question has O(billion) keys, and afterwards it will have ~10%
>> of that at most.
>>
>> I basically keep losing the jobs due to too many task failures, all rooted
>> in:
>> Caused by: TimedOutException()
>> at
>>
>> org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:13020)
>>
>> And yes, I've messed around with:
>> -Number of failures for map/reduce/tracker (in the hadoop confs)
>> -split_size (on the URL)
>> -cassandra.range.batch.size
>>
>> But it hasn't helped.  My failsafe is to roll my own distributed process,
>> rather than falling into a pit of internal hadoop settings.  But I feel
>> like I'm close.
>>
>> The problem in my opinion, watching how things are going, is the
>> correlation of splits <-> tasks.  I'm obviously using Pig, so this part of
>> the process is fairly opaque to me at the moment.  But, "something
>> somewhere" is picking 20 tasks for my job, and this is fairly independent
>> of the # of task slots (I've booted EMR cluster with different #'s and
>> always get 20).  Why does this matter?  When a task fails, it retries from
>> the start, which is a killer for me as I "delete as I go", making that
>> pointless work and massively increasing the odds of an overall job
>> failure.
>>  If hadoop/pig chose a large number of tasks, the retries would be much
>> less of a burden.  But, I don't see where/what lets me mess with that
>> logic.
>>
>> Pig gives the ability to mess with reducers (PARALLEL), but I'm in the
>> load
>> path, which is all mappers.  I've never jumped to the lower, raw hadoop
>> level before.  But, I'm worried that will be the "falling into a pit"
>> issue...
>>
>> I'm using Cassandra 1.2.15.
>>
>> will
>>
>

Re: Fwd: using hadoop + cassandra for CF mutations (delete)

Posted by Suraj Nayak <sn...@gmail.com>.
Hi Will,

Try changing the value of num_tokens in conf/cassandra.yaml.

Set to your desired value minus 1.

Example, if you want 5 map tasks to run, set the value of num_tokens to 4
(default is 256)

I encountered almost same situation when I was trying to load or write very
small data from/into Cassandra. It was launching 257 map tasks.  When
num_tokens value reduced to 1 it Pig launched only 2 job. Do restart
Cassandra service after change.

Hope it might help..

--
Suraj
On 04-Apr-2014 11:44 PM, "William Oberman" <ob...@civicscience.com> wrote:

> Apologies for cross posting!
>
> My core issue is unblocked, but I'm still curious on one aspect of my
> question to the cassandra mailing list.  How does Pig/Hadoop decide how
> many tasks there are?  The forwarded email below has the gory details, but
> basically:
> -My Pig loadFunc was CassandraStorage
> -The "table" (column family in cassandra) has something like a billion rows
> in it, and I want to say ~3TB of data.
> -No matter what I tried(*), Pig/Hadoop decided this was worthy of 20 tasks
>
> (*) I changed settings in the loadFunc, I booted hadoop clusters with more
> or less task slots, etc...
>
> I'm using AWS's EMR, which claims to be hadoop 1.0.3 + pig 11.
>
> will
>
> ---------- Forwarded message ----------
> From: William Oberman <ob...@civicscience.com>
> Date: Fri, Apr 4, 2014 at 12:24 PM
> Subject: using hadoop + cassandra for CF mutations (delete)
> To: "user@cassandra.apache.org" <us...@cassandra.apache.org>
>
>
> Hi,
>
> I have some history with cassandra + hadoop:
> 1.) Single DC + integrated hadoop = Was "ok" until I needed steady
> performance (the single DC was used in a production environment)
> 2.) Two DC's + integrated hadoop on 1 of 2 DCs = Was "ok" until my data
> grew and in AWS compute is expensive compared to data storage... e.g.
> running a 24x7 DC was a lot more expensive than the following solution...
> 3.) Single DC + a constant "ETL" to S3 = Is still ok, I can spawn an
> "arbitrarily large" EMR cluster.  And 24x7 data storage + transient EMR is
> cost effective.
>
> But, one of my CF's has had a change of usage pattern making a large %, but
> not all of the data, fairly pointless to store.  I thought I'd write a Pig
> UDF that could peek at a row of data and delete if it fails my criteria.
>  And it "works" in terms of logic, but not in terms of practical execution.
>  The CF in question has O(billion) keys, and afterwards it will have ~10%
> of that at most.
>
> I basically keep losing the jobs due to too many task failures, all rooted
> in:
> Caused by: TimedOutException()
> at
>
> org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:13020)
>
> And yes, I've messed around with:
> -Number of failures for map/reduce/tracker (in the hadoop confs)
> -split_size (on the URL)
> -cassandra.range.batch.size
>
> But it hasn't helped.  My failsafe is to roll my own distributed process,
> rather than falling into a pit of internal hadoop settings.  But I feel
> like I'm close.
>
> The problem in my opinion, watching how things are going, is the
> correlation of splits <-> tasks.  I'm obviously using Pig, so this part of
> the process is fairly opaque to me at the moment.  But, "something
> somewhere" is picking 20 tasks for my job, and this is fairly independent
> of the # of task slots (I've booted EMR cluster with different #'s and
> always get 20).  Why does this matter?  When a task fails, it retries from
> the start, which is a killer for me as I "delete as I go", making that
> pointless work and massively increasing the odds of an overall job failure.
>  If hadoop/pig chose a large number of tasks, the retries would be much
> less of a burden.  But, I don't see where/what lets me mess with that
> logic.
>
> Pig gives the ability to mess with reducers (PARALLEL), but I'm in the load
> path, which is all mappers.  I've never jumped to the lower, raw hadoop
> level before.  But, I'm worried that will be the "falling into a pit"
> issue...
>
> I'm using Cassandra 1.2.15.
>
> will
>