You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by "Matthew E. Kennedy" <ma...@spadac.com> on 2011/02/01 20:06:52 UTC

Pig not reading all cassandra data

I'm running Cassandra 0.7 and I'm trying to get Pig integration to work correctly.  I'm using Pig 0.8 running against Hadoop 20.2, I've also tried this running against CDH2.

I can log into the grunt shell, and execute scripts, but when they run, they don't read all of the data from Cassandra.  The job only results in one mapper being created, and that only reads a small fraction of the data on a node.  I don't see any obvious error messages anywhere, so I'm not sure how to pinpoint the problem.

To confirm that I had the cluster set up correctly, I wrote a simple map reduce job in Java that seems to use the ColumnFamily input format correctly and appears to distribute the job correctly across all the nodes in the cluster.  I had a small number of killed jobs at the end of the process though, and I'm not sure whether that is a symptom if something.  It looked like the Map phase would have been much faster if those jobs weren't waiting to be killed.  But the output was correct, I compared it to a job that operated on the source data that I used to populate the cluster and the output was identical.  In case its interesting, this data is 134 million records, the Cassandra Map Reduce Job ran in 14 minutes and the same calculation running on the raw data in HDFS took three minutes.

I suspected at first that I was not correctly connecting the grunt shell to the cluster, but when I start grunt it correctly indicates the correct URLs for HDFS and the job tracker.

When the job appears in the job tracker web UI, it is only executing one map.

What's really interesting, is that Pig reports that it read 65k input records. When I multiply 65k, by the number of maps spawned by the Java Map Reduce job that actually works, I get 134 million, which is the number of records I'm reading.  So it looks like the input split size is being calculated correctly, but only one of the maps gets executed.  That has me kind of stumped.

Here is the grunt session with line numbers prepended:

  1 cassandra@rdcl000:~/benchmark/cassandra-0.7.0/contrib/pig$ bin/pig_cassandra
  2 2011-02-01 12:47:02,353 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/cassandra/benchmark/cassandra-0.7.0/contrib/pig/pig_1296582422349.log
  3 2011-02-01 12:47:02,538 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://rdcl000:9000
  4 2011-02-01 12:47:02,644 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: rdcl000:9001
  5 grunt> register /home/hadoop/local/pig/pig-0.8.0-core.jar; register /home/cassandra/benchmark/cassandra-0.7.0/lib/libthrift-0.5.jar;
  6 grunt> rows = LOAD 'cassandra://rdclks/mycftest' USING CassandraStorage();
  7 grunt> countthis = GROUP rows ALL;
  8 grunt> countedrows = FOREACH countthis GENERATE COUNT(rows.$0);
  9 grunt> dump countedrows;
 10 2011-02-01 12:47:31,219 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: GROUP_BY
 11 2011-02-01 12:47:31,219 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - pig.usenewlogicalplan is set to true. New logical plan will be used.
 12 2011-02-01 12:47:31,397 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - (Name: countedrows: Store(hdfs://rdcl000:9000/tmp/temp-1188844399/tmp1986503871:org.apache.pig.im    pl.io.InterStorage) - scope-10 Operator Key: scope-10)
 13 2011-02-01 12:47:31,408 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
 14 2011-02-01 12:47:31,419 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer - Choosing to move algebraic foreach to combiner
 15 2011-02-01 12:47:31,447 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
 16 2011-02-01 12:47:31,447 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
 17 2011-02-01 12:47:31,478 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job
 18 2011-02-01 12:47:31,491 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
 19 2011-02-01 12:47:35,418 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
 20 2011-02-01 12:47:35,478 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
 21 2011-02-01 12:47:35,980 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
 22 2011-02-01 12:47:35,995 [Thread-13] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
 23 2011-02-01 12:47:36,750 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201101241634_0183
 24 2011-02-01 12:47:36,750 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://rdcl000:50030/jobdetails.jsp?jobid=job_201101241634_0    183
 25 2011-02-01 12:47:57,793 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete
 26 2011-02-01 12:48:16,346 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
 27 2011-02-01 12:48:16,347 [main] INFO  org.apache.pig.tools.pigstats.PigStats - Script Statistics:
 28 
 29 HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt      Features
 30 0.20.2  0.8.0   cassandra       2011-02-01 12:47:31     2011-02-01 12:48:16     GROUP_BY
 31 
 32 Success!
 33 
 34 Job Stats (time in seconds):
 35 JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime      MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature Outputs
 36 job_201101241634_0183   1       1       18      18      18      12      12      12      countedrows,countthis,rows      GROUP_BY,COMBINER       hdfs://rdcl000:9000/tmp/temp-1188844399/tmp1986503871,
 37 
 38 Input(s):
 39 Successfully read 64985 records from: "cassandra://rdcl/famstest"
 40 
 41 Output(s):
 42 Successfully stored 1 records (14 bytes) in: "hdfs://rdcl000:9000/tmp/temp-1188844399/tmp1986503871"
 43 
 44 Counters:
 45 Total records written : 1
 46 Total bytes written : 14
 47 Spillable Memory Manager spill count : 0
 48 Total bags proactively spilled: 0
 49 Total records proactively spilled: 0
 50 
 51 Job DAG:
 52 job_201101241634_0183
 53 
 54 
 55 2011-02-01 12:48:16,352 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
 56 2011-02-01 12:48:16,374 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
 57 2011-02-01 12:48:16,374 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
 58 (64985)

Any help is really appreciated.

-Matt Kennedy



Re: Pig not reading all cassandra data

Posted by Jonathan Ellis <jb...@gmail.com>.
Thanks a lot for the help on this!

>From what I can tell that looks like a good solution.  Created
https://issues.apache.org/jira/browse/CASSANDRA-2184 to make that
change.

On Thu, Feb 17, 2011 at 11:52 AM, Matt Kennedy <st...@gmail.com> wrote:
> I have a resolution for how I'm dealing with this problem for my particular
> situation and I'd like to throw it out there to see if you think it should
> be integrated into the core Cassandra code.
>
> Just to repeat, the immediate workaround for this is to set
> -Dpig.splitCombination=false when you launch pig.
>
> However, we wanted to keep splitCombination on because it is a useful
> optimization for a lot of our use cases, so I went digging for the least
> intrusive way to keep the split combiner on, but also prevent it from
> combining splits that read from Cassandra.  My solution, which you are
> welcome to critique, is to change line 65 of
> http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
> such that it returns Long.MAX_VALUE instead of zero.
>
> That effectively turns off split combination in Pig 0.8 when reading from
> Cassandra, but leaves it on for everything else.  So far, I can't see any
> negative side effects from it.
>
> Thoughts?
>
>
> On Fri, Feb 11, 2011 at 3:37 PM, Matt Kennedy <st...@gmail.com> wrote:
>>
>> Sorry it has taken me a while to get back to this.  I'm still trying to
>> get to the bottom of this to find where the disconnect is between the column
>> family input format code and the Pig optimizer.
>>
>> I suspected that the problem was line 365 of:
>>
>> http://svn.apache.org/viewvc/pig/tags/release-0.8.0/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?view=markup
>>
>> ...but I changed the ColumnFamilySplit.java file so that it returns -1
>> instead of 0, the result of which is that the Pig job will iterate over the
>> entirety of the cassandra data that it is supposed to, but it does so with
>> only one mapper.  It looks like the Pig map combiner isn't using the
>> split.getLength call to determine how the maps get combined as I originally
>> suspected.  I'll update when I figure more out.
>>
>> -Matt
>>
>> On Sat, Feb 5, 2011 at 1:01 AM, Jonathan Ellis <jb...@gmail.com> wrote:
>>>
>>> On Fri, Feb 4, 2011 at 9:47 PM, Matt Kennedy <st...@gmail.com>
>>> wrote:
>>> > Found the culprit.  There is a new feature in Pig 0.8 that will try to
>>> > reduce the number of splits used to speed up the whole job.  Since the
>>> > ColumnFamilyInputFormat lists the input size as zero, this feature
>>> > eliminates all of the splits except for one.
>>> >
>>> > The workaround is to disable this feature for jobs that use
>>> > CassandraStorage
>>> > by setting -Dpig.splitCombination=false in the pig_cassandra script.
>>> >
>>> > Hope somebody finds this useful, you wouldn't believe how many
>>> > dead-ends I
>>> > ran down trying to figure this out.
>>>
>>> Ouch, thanks for tracking that down.
>>>
>>> What should CFIF be returning differently?  Do you mean the
>>> InputSplit.getLength?
>>>
>>> --
>>> Jonathan Ellis
>>> Project Chair, Apache Cassandra
>>> co-founder of DataStax, the source for professional Cassandra support
>>> http://www.datastax.com
>>
>
>



-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of DataStax, the source for professional Cassandra support
http://www.datastax.com

Re: Pig not reading all cassandra data

Posted by Matt Kennedy <st...@gmail.com>.
I have a resolution for how I'm dealing with this problem for my particular
situation and I'd like to throw it out there to see if you think it should
be integrated into the core Cassandra code.

Just to repeat, the immediate workaround for this is to set
-Dpig.splitCombination=false when you launch pig.

However, we wanted to keep splitCombination on because it is a useful
optimization for a lot of our use cases, so I went digging for the least
intrusive way to keep the split combiner on, but also prevent it from
combining splits that read from Cassandra.  My solution, which you are
welcome to critique, is to change line 65 of
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.javasuch
that it returns Long.MAX_VALUE instead of zero.

That effectively turns off split combination in Pig 0.8 when reading from
Cassandra, but leaves it on for everything else.  So far, I can't see any
negative side effects from it.

Thoughts?


On Fri, Feb 11, 2011 at 3:37 PM, Matt Kennedy <st...@gmail.com> wrote:

> Sorry it has taken me a while to get back to this.  I'm still trying to get
> to the bottom of this to find where the disconnect is between the column
> family input format code and the Pig optimizer.
>
> I suspected that the problem was line 365 of:
>
> http://svn.apache.org/viewvc/pig/tags/release-0.8.0/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?view=markup
>
> ...but I changed the ColumnFamilySplit.java file so that it returns -1
> instead of 0, the result of which is that the Pig job will iterate over the
> entirety of the cassandra data that it is supposed to, but it does so with
> only one mapper.  It looks like the Pig map combiner isn't using the
> split.getLength call to determine how the maps get combined as I originally
> suspected.  I'll update when I figure more out.
>
> -Matt
>
>
> On Sat, Feb 5, 2011 at 1:01 AM, Jonathan Ellis <jb...@gmail.com> wrote:
>
>> On Fri, Feb 4, 2011 at 9:47 PM, Matt Kennedy <st...@gmail.com>
>> wrote:
>> > Found the culprit.  There is a new feature in Pig 0.8 that will try to
>> > reduce the number of splits used to speed up the whole job.  Since the
>> > ColumnFamilyInputFormat lists the input size as zero, this feature
>> > eliminates all of the splits except for one.
>> >
>> > The workaround is to disable this feature for jobs that use
>> CassandraStorage
>> > by setting -Dpig.splitCombination=false in the pig_cassandra script.
>> >
>> > Hope somebody finds this useful, you wouldn't believe how many dead-ends
>> I
>> > ran down trying to figure this out.
>>
>> Ouch, thanks for tracking that down.
>>
>> What should CFIF be returning differently?  Do you mean the
>> InputSplit.getLength?
>>
>> --
>> Jonathan Ellis
>> Project Chair, Apache Cassandra
>> co-founder of DataStax, the source for professional Cassandra support
>> http://www.datastax.com
>>
>
>

Re: Pig not reading all cassandra data

Posted by Matt Kennedy <st...@gmail.com>.
Sorry it has taken me a while to get back to this.  I'm still trying to get
to the bottom of this to find where the disconnect is between the column
family input format code and the Pig optimizer.

I suspected that the problem was line 365 of:
http://svn.apache.org/viewvc/pig/tags/release-0.8.0/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?view=markup

...but I changed the ColumnFamilySplit.java file so that it returns -1
instead of 0, the result of which is that the Pig job will iterate over the
entirety of the cassandra data that it is supposed to, but it does so with
only one mapper.  It looks like the Pig map combiner isn't using the
split.getLength call to determine how the maps get combined as I originally
suspected.  I'll update when I figure more out.

-Matt

On Sat, Feb 5, 2011 at 1:01 AM, Jonathan Ellis <jb...@gmail.com> wrote:

> On Fri, Feb 4, 2011 at 9:47 PM, Matt Kennedy <st...@gmail.com> wrote:
> > Found the culprit.  There is a new feature in Pig 0.8 that will try to
> > reduce the number of splits used to speed up the whole job.  Since the
> > ColumnFamilyInputFormat lists the input size as zero, this feature
> > eliminates all of the splits except for one.
> >
> > The workaround is to disable this feature for jobs that use
> CassandraStorage
> > by setting -Dpig.splitCombination=false in the pig_cassandra script.
> >
> > Hope somebody finds this useful, you wouldn't believe how many dead-ends
> I
> > ran down trying to figure this out.
>
> Ouch, thanks for tracking that down.
>
> What should CFIF be returning differently?  Do you mean the
> InputSplit.getLength?
>
> --
> Jonathan Ellis
> Project Chair, Apache Cassandra
> co-founder of DataStax, the source for professional Cassandra support
> http://www.datastax.com
>

Re: Pig not reading all cassandra data

Posted by Jonathan Ellis <jb...@gmail.com>.
On Fri, Feb 4, 2011 at 9:47 PM, Matt Kennedy <st...@gmail.com> wrote:
> Found the culprit.  There is a new feature in Pig 0.8 that will try to
> reduce the number of splits used to speed up the whole job.  Since the
> ColumnFamilyInputFormat lists the input size as zero, this feature
> eliminates all of the splits except for one.
>
> The workaround is to disable this feature for jobs that use CassandraStorage
> by setting -Dpig.splitCombination=false in the pig_cassandra script.
>
> Hope somebody finds this useful, you wouldn't believe how many dead-ends I
> ran down trying to figure this out.

Ouch, thanks for tracking that down.

What should CFIF be returning differently?  Do you mean the
InputSplit.getLength?

-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of DataStax, the source for professional Cassandra support
http://www.datastax.com

Re: Pig not reading all cassandra data

Posted by Matt Kennedy <st...@gmail.com>.
Found the culprit.  There is a new feature in Pig 0.8 that will try to reduce the number of splits used to speed up the whole job.  Since the ColumnFamilyInputFormat lists the input size as zero, this feature eliminates all of the splits except for one.  

The workaround is to disable this feature for jobs that use CassandraStorage by setting -Dpig.splitCombination=false in the pig_cassandra script.

Hope somebody finds this useful, you wouldn't believe how many dead-ends I ran down trying to figure this out.

-Matt 
On Feb 2, 2011, at 4:34 PM, Matthew E. Kennedy wrote:

> 
> I noticed in the jobtracker log that when the pig job kicks off, I get the following info message:
> 
> 2011-02-02 09:13:07,269 INFO org.apache.hadoop.mapred.JobInProgress: Input size for job job_201101241634_0193 = 0. Number of splits = 1
> 
> So I looked at the job.split file that is created for the Pig job and compared it to the job.split file created for the map-reduce job.  The map reduce file contains an entry for each split, whereas the  job.split file for the Pig job contains just the one split.
> 
> I added some code to the ColumnFamilyInputFormat to output what it thinks it sees as it should be creating input splits for the pig jobs, and the call to getSplits() appears to be returning the correct list of splits.  I can't figure out where it goes wrong though when the splits should be written to the job.split file.
> 
> Does anybody know the specific class responsible for creating that file in a Pig job, and why it might be affected by using the pig CassandraStorage module?
> 
> Is anyone else successfully running Pig jobs against a 0.7 cluster?
> 
> Thanks,
> Matt


Re: Pig not reading all cassandra data

Posted by "Matthew E. Kennedy" <ma...@spadac.com>.
I noticed in the jobtracker log that when the pig job kicks off, I get the following info message:

2011-02-02 09:13:07,269 INFO org.apache.hadoop.mapred.JobInProgress: Input size for job job_201101241634_0193 = 0. Number of splits = 1

So I looked at the job.split file that is created for the Pig job and compared it to the job.split file created for the map-reduce job.  The map reduce file contains an entry for each split, whereas the  job.split file for the Pig job contains just the one split.

I added some code to the ColumnFamilyInputFormat to output what it thinks it sees as it should be creating input splits for the pig jobs, and the call to getSplits() appears to be returning the correct list of splits.  I can't figure out where it goes wrong though when the splits should be written to the job.split file.

Does anybody know the specific class responsible for creating that file in a Pig job, and why it might be affected by using the pig CassandraStorage module?

Is anyone else successfully running Pig jobs against a 0.7 cluster?

Thanks,
Matt