You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@mahout.apache.org by Jeff Eastman <je...@Narus.com> on 2011/07/21 23:30:42 UTC

FW: meanshift reduce task problem

+dev
+user

r1149369 implements the previous MAHOUT-749 patch that introduces support for multiple reducers (specified by -Dmapred.reduce.tasks=N) for improved scalability beyond the default of 1. The heuristic sends the clusters produced by each mapper to 1 of N reducers in a round-robin fashion to intermix them with the results of the other mappers. In each iteration, N is reduced by one until a final reducer merges the results of all previous iterations.

The patch also implements a change to the representation of MeanShiftCanopies, introducing a new 'mass' field which tracks the number of bound points formerly computed as needed from the boundPoints array itself. This allows the accumulation of the bound points in the clusters to be omitted if the -cl option is not present in the CLI invocation and a massive reduction in the cluster sizes for situations where the clustered (classified) points are not being requested in the first place.

Both of these enhancements need further testing to see how they really impact scalability of mean shift canopy clustering. I'm working on that now.

-----Original Message-----
From: Sengupta, Sohini IN BLR SISL [mailto:sohini.sengupta@siemens.com] 
Sent: Thursday, July 21, 2011 5:03 AM
To: Jeff Eastman
Cc: Sengupta, Sohini IN BLR SISL
Subject: RE: meanshift reduce task problem

Hi Jeff,
I have around 30 input files and am specifying 18 reducers.
Transcript output: Well on the terminal I keep seeing the job's progress but after a few failures (failed to report status for 601 seconds. Killing!) for the reducer with the maximum load the whole job fails automatically and I see a heap size problem on the window. Is it what you wanted to know? Please clarify.
Thanks again,
Regards,
Sohini

-----Original Message-----
From: Jeff Eastman [mailto:jeastman@Narus.com]
Sent: Wednesday, July 20, 2011 11:19 PM
To: Sengupta, Sohini IN BLR SISL
Subject: RE: meanshift reduce task problem

Hi Sohini,

I have not been able to test this at scale yet so there may yet be problems. Can you give me a little more information:
- how many input files do you have?
- how many reducers are you specifying?
- can you give me the transcript output?

Jeff

-----Original Message-----
From: Sengupta, Sohini IN BLR SISL [mailto:sohini.sengupta@siemens.com]
Sent: Wednesday, July 20, 2011 6:42 AM
To: Jeff Eastman
Cc: Sengupta, Sohini IN BLR SISL
Subject: RE: meanshift reduce task problem

Hi Jeff,
I have applied the patch which you have given but I still do not see the load being balanced evenly on all nodes. What am I doing wrong? Any suggestions or clues?
Thanks a lot in advance,
Regards,
Sohini

-----Original Message-----
From: Jeff Eastman [mailto:jeastman@Narus.com]
Sent: Thursday, June 30, 2011 9:35 PM
To: Sengupta, Sohini IN BLR SISL
Subject: RE: meanshift reduce task problem

Yes, see
     [ https://issues.apache.org/jira/browse/MAHOUT-749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]


-----Original Message-----
From: Sengupta, Sohini IN BLR SISL [mailto:sohini.sengupta@siemens.com]
Sent: Wednesday, June 29, 2011 10:54 PM
To: Jeff Eastman
Cc: user@mahout.apache.org
Subject: RE: meanshift reduce task problem

Hi Jeff,
Will you be providing the patch sometime soon?
Thanks and regards,
Sohini

-----Original Message-----
From: Jeff Eastman [mailto:jeastman@Narus.com]
Sent: Friday, June 24, 2011 11:34 PM
To: user@mahout.apache.org
Subject: RE: meanshift reduce task problem

I've got a simple patch in the works that modifies the driver and mapper to spray its output clusters across all the reducers (specified by -Dmapred.reduce.tasks=n). This will mix the mapper output sets and; hopefully, improve scalability by allowing multiple reducers to play. The patch also decreases the number of reducers by one in each iteration, resulting in a single reducer in the end game. I want to do some testing of this before I commit.

-----Original Message-----
From: Jeff Eastman [mailto:jeastman@Narus.com]
Sent: Thursday, June 23, 2011 12:34 PM
To: user@mahout.apache.org
Subject: RE: meanshift reduce task problem

If you look at the MeanShiftCanopyMapper, you will notice that the mapper writes all merged canopy values to a single "canopy" key. Running this with multiple reducers will give the same performance as running with a single reducer as only one reducer will get the values associated with that key. That is why the driver was setting the numReduceTasks(1).

I've thought a bit about how this might be improved. It is clearly a form of bottom-up hierarchical clustering algorithm:
- The input vector set is first converted into an equivalent set of 1-vector clusters by a map-only process that preserves the number of input files.
- Then the cluster set is partitioned into M mappers by hadoop. Each mapper combines clusters which "touch" and shifts all clusters toward their local means using T1 and T2 parameters like in Canopy. Presumably, each mapper's output set will be smaller than it's input set due to the combination of touching clusters.
- Here's where the scalability breaks down as a single reducer is used to merge the mapper output sets into a single file, after which a single mapper and reducer will complete the iterations.
- How can we utilize more reducers and thus keep the concurrency levels up?
-- We could imagine just using map-only processing, but this would keep the original input sets distinct. Each mapper set would converge internally but points close to each other in different mapper sets would never see each other and thus could never merge. Not workable; we need the input sets to be mixed in each iteration to avoid this.
-- If we knew we were going to have M mappers for an iteration, and we wanted to use R reducers, then one approach might be to randomly assign each mapper output cluster to one of the R reducers. This would mix up the mapper streams in each iteration and allow adjacent points to be combined across the mapper streams. If we simultaneously decreased the R value for each iteration (as the clusters merged) then finally there would be a single output file that had considered all of the input points.
- Of course, there is another scalability limitation in the current implementation: each cluster contains the clusterIds of all the other clusters which it has consumed. Thus the size of each cluster will grow monotonically and at some point will outgrow the heap. This could be handled by writing out merge records during processing and then combining them back to determine the point->cluster mapping. This is likely not necessary until the first limitation is addressed, but could be done independently.

I'm open to other ways of thinking about the scalability of mean shift. Comments would be appreciated.



-----Original Message-----
From: Sengupta, Sohini IN BLR SISL [mailto:sohini.sengupta@siemens.com]
Sent: Thursday, June 23, 2011 5:59 AM
To: user@mahout.apache.org
Subject: FW: meanshift reduce task problem

Any inputs will be helpful.
Thanks

________________________________
From: Sengupta, Sohini IN BLR SISL
Sent: Wednesday, June 22, 2011 5:15 PM
To: user@mahout.apache.org
Cc: Sengupta, Sohini IN BLR SISL
Subject: meanshift reduce task problem

Hi,

I have programmatically specified setNumReduceTasks(16) in MeanShiftCanopyDriver.java. On execution the number of reducers is being set correctly (i.e. 16 as visible on jobtracker screen)  but on digging deeper I see that one node has maximum number of bytes to process and it is nominal for rest of the nodes. Hence the reduce phase is very slow after 98% completion.

I am trying this on a cluster of 18 nodes. I also see that load is distributed evenly in map phase but not in reduce. This is happening on 0.4 and 0.5 versions of Mahout. Has anyone faced such a problem and how to get around it?
Thanks a lot in advance,
Sohini

________________________________
Important notice: This e-mail and any attachment there to contains corporate proprietary information. If you have received it by mistake, please notify us immediately by reply e-mail and delete this e-mail and its attachments from your system.
Thank You.

Important notice: This e-mail and any attachment there to contains corporate proprietary information. If you have received it by mistake, please notify us immediately by reply e-mail and delete this e-mail and its attachments from your system.
Thank You.

Important notice: This e-mail and any attachment there to contains corporate proprietary information. If you have received it by mistake, please notify us immediately by reply e-mail and delete this e-mail and its attachments from your system.
Thank You.

Important notice: This e-mail and any attachment there to contains corporate proprietary information. If you have received it by mistake, please notify us immediately by reply e-mail and delete this e-mail and its attachments from your system.
Thank You.

RE: meanshift reduce task problem

Posted by "Sengupta, Sohini IN BLR SISL" <so...@siemens.com>.
Hi Jeff,
I tried running this on synthetic_control dataset. I see load being balanced on reducers now; but the job stops after multiple failures with the following message:

at com.google.common.base.Preconditions.checkArgument(Preconditions.java:115)
        at org.apache.mahout.math.VectorWritable.readFields(VectorWritable.java:81)
        at org.apache.mahout.clustering.AbstractCluster.readFields(AbstractCluster.java:228)
        at org.apache.mahout.clustering.DistanceMeasureCluster.readFields(DistanceMeasureCluster.java:61)
        at org.apache.mahout.clustering.kmeans.Cluster.readFields(Cluster.java:70)
        at org.apache.mahout.clustering.meanshift.MeanShiftCanopy.readFields(MeanShiftCanopy.java:129)
        at org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:1751)
        at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1879)
        at org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator.computeNext(SequenceFileValueIterator.java:76)
        at org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator.computeNext(SequenceFileValueIterator.java:35)
        at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
        at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
        at com.google.common.collect.Iterators$5.hasNext(Iterators.java:525)
        at com.google.common.collect.ForwardingIterator.hasNext(ForwardingIterator.java:43)
        at org.apache.mahout.clustering.meanshift.MeanShiftCanopyClusterMapper.getCanopies(MeanShiftCanopyClusterMapper.java:65)
        at org.apache.mahout.clustering.meanshift.MeanShiftCanopyClusterMapper.setup(MeanShiftCanopyClusterMapper.java:57)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
        at org.apache.hadoop.mapred.Child.main(Child.java:170)


Thanks and regards,
Sohini
-----Original Message-----
From: Jeff Eastman [mailto:jeastman@Narus.com]
Sent: Friday, July 22, 2011 3:01 AM
To: dev@mahout.apache.org; user@mahout.apache.org
Subject: FW: meanshift reduce task problem

+dev
+user

r1149369 implements the previous MAHOUT-749 patch that introduces support for multiple reducers (specified by -Dmapred.reduce.tasks=N) for improved scalability beyond the default of 1. The heuristic sends the clusters produced by each mapper to 1 of N reducers in a round-robin fashion to intermix them with the results of the other mappers. In each iteration, N is reduced by one until a final reducer merges the results of all previous iterations.

The patch also implements a change to the representation of MeanShiftCanopies, introducing a new 'mass' field which tracks the number of bound points formerly computed as needed from the boundPoints array itself. This allows the accumulation of the bound points in the clusters to be omitted if the -cl option is not present in the CLI invocation and a massive reduction in the cluster sizes for situations where the clustered (classified) points are not being requested in the first place.

Both of these enhancements need further testing to see how they really impact scalability of mean shift canopy clustering. I'm working on that now.

-----Original Message-----
From: Sengupta, Sohini IN BLR SISL [mailto:sohini.sengupta@siemens.com]
Sent: Thursday, July 21, 2011 5:03 AM
To: Jeff Eastman
Cc: Sengupta, Sohini IN BLR SISL
Subject: RE: meanshift reduce task problem

Hi Jeff,
I have around 30 input files and am specifying 18 reducers.
Transcript output: Well on the terminal I keep seeing the job's progress but after a few failures (failed to report status for 601 seconds. Killing!) for the reducer with the maximum load the whole job fails automatically and I see a heap size problem on the window. Is it what you wanted to know? Please clarify.
Thanks again,
Regards,
Sohini

-----Original Message-----
From: Jeff Eastman [mailto:jeastman@Narus.com]
Sent: Wednesday, July 20, 2011 11:19 PM
To: Sengupta, Sohini IN BLR SISL
Subject: RE: meanshift reduce task problem

Hi Sohini,

I have not been able to test this at scale yet so there may yet be problems. Can you give me a little more information:
- how many input files do you have?
- how many reducers are you specifying?
- can you give me the transcript output?

Jeff

-----Original Message-----
From: Sengupta, Sohini IN BLR SISL [mailto:sohini.sengupta@siemens.com]
Sent: Wednesday, July 20, 2011 6:42 AM
To: Jeff Eastman
Cc: Sengupta, Sohini IN BLR SISL
Subject: RE: meanshift reduce task problem

Hi Jeff,
I have applied the patch which you have given but I still do not see the load being balanced evenly on all nodes. What am I doing wrong? Any suggestions or clues?
Thanks a lot in advance,
Regards,
Sohini

-----Original Message-----
From: Jeff Eastman [mailto:jeastman@Narus.com]
Sent: Thursday, June 30, 2011 9:35 PM
To: Sengupta, Sohini IN BLR SISL
Subject: RE: meanshift reduce task problem

Yes, see
     [ https://issues.apache.org/jira/browse/MAHOUT-749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]


-----Original Message-----
From: Sengupta, Sohini IN BLR SISL [mailto:sohini.sengupta@siemens.com]
Sent: Wednesday, June 29, 2011 10:54 PM
To: Jeff Eastman
Cc: user@mahout.apache.org
Subject: RE: meanshift reduce task problem

Hi Jeff,
Will you be providing the patch sometime soon?
Thanks and regards,
Sohini

-----Original Message-----
From: Jeff Eastman [mailto:jeastman@Narus.com]
Sent: Friday, June 24, 2011 11:34 PM
To: user@mahout.apache.org
Subject: RE: meanshift reduce task problem

I've got a simple patch in the works that modifies the driver and mapper to spray its output clusters across all the reducers (specified by -Dmapred.reduce.tasks=n). This will mix the mapper output sets and; hopefully, improve scalability by allowing multiple reducers to play. The patch also decreases the number of reducers by one in each iteration, resulting in a single reducer in the end game. I want to do some testing of this before I commit.

-----Original Message-----
From: Jeff Eastman [mailto:jeastman@Narus.com]
Sent: Thursday, June 23, 2011 12:34 PM
To: user@mahout.apache.org
Subject: RE: meanshift reduce task problem

If you look at the MeanShiftCanopyMapper, you will notice that the mapper writes all merged canopy values to a single "canopy" key. Running this with multiple reducers will give the same performance as running with a single reducer as only one reducer will get the values associated with that key. That is why the driver was setting the numReduceTasks(1).

I've thought a bit about how this might be improved. It is clearly a form of bottom-up hierarchical clustering algorithm:
- The input vector set is first converted into an equivalent set of 1-vector clusters by a map-only process that preserves the number of input files.
- Then the cluster set is partitioned into M mappers by hadoop. Each mapper combines clusters which "touch" and shifts all clusters toward their local means using T1 and T2 parameters like in Canopy. Presumably, each mapper's output set will be smaller than it's input set due to the combination of touching clusters.
- Here's where the scalability breaks down as a single reducer is used to merge the mapper output sets into a single file, after which a single mapper and reducer will complete the iterations.
- How can we utilize more reducers and thus keep the concurrency levels up?
-- We could imagine just using map-only processing, but this would keep the original input sets distinct. Each mapper set would converge internally but points close to each other in different mapper sets would never see each other and thus could never merge. Not workable; we need the input sets to be mixed in each iteration to avoid this.
-- If we knew we were going to have M mappers for an iteration, and we wanted to use R reducers, then one approach might be to randomly assign each mapper output cluster to one of the R reducers. This would mix up the mapper streams in each iteration and allow adjacent points to be combined across the mapper streams. If we simultaneously decreased the R value for each iteration (as the clusters merged) then finally there would be a single output file that had considered all of the input points.
- Of course, there is another scalability limitation in the current implementation: each cluster contains the clusterIds of all the other clusters which it has consumed. Thus the size of each cluster will grow monotonically and at some point will outgrow the heap. This could be handled by writing out merge records during processing and then combining them back to determine the point->cluster mapping. This is likely not necessary until the first limitation is addressed, but could be done independently.

I'm open to other ways of thinking about the scalability of mean shift. Comments would be appreciated.



-----Original Message-----
From: Sengupta, Sohini IN BLR SISL [mailto:sohini.sengupta@siemens.com]
Sent: Thursday, June 23, 2011 5:59 AM
To: user@mahout.apache.org
Subject: FW: meanshift reduce task problem

Any inputs will be helpful.
Thanks

________________________________
From: Sengupta, Sohini IN BLR SISL
Sent: Wednesday, June 22, 2011 5:15 PM
To: user@mahout.apache.org
Cc: Sengupta, Sohini IN BLR SISL
Subject: meanshift reduce task problem

Hi,

I have programmatically specified setNumReduceTasks(16) in MeanShiftCanopyDriver.java. On execution the number of reducers is being set correctly (i.e. 16 as visible on jobtracker screen)  but on digging deeper I see that one node has maximum number of bytes to process and it is nominal for rest of the nodes. Hence the reduce phase is very slow after 98% completion.

I am trying this on a cluster of 18 nodes. I also see that load is distributed evenly in map phase but not in reduce. This is happening on 0.4 and 0.5 versions of Mahout. Has anyone faced such a problem and how to get around it?
Thanks a lot in advance,
Sohini

________________________________
Important notice: This e-mail and any attachment there to contains corporate proprietary information. If you have received it by mistake, please notify us immediately by reply e-mail and delete this e-mail and its attachments from your system.
Thank You.

Important notice: This e-mail and any attachment there to contains corporate proprietary information. If you have received it by mistake, please notify us immediately by reply e-mail and delete this e-mail and its attachments from your system.
Thank You.

Important notice: This e-mail and any attachment there to contains corporate proprietary information. If you have received it by mistake, please notify us immediately by reply e-mail and delete this e-mail and its attachments from your system.
Thank You.

Important notice: This e-mail and any attachment there to contains corporate proprietary information. If you have received it by mistake, please notify us immediately by reply e-mail and delete this e-mail and its attachments from your system.
Thank You.

Important notice: This e-mail and any attachment there to contains corporate proprietary information. If you have received it by mistake, please notify us immediately by reply e-mail and delete this e-mail and its attachments from your system.
Thank You.