You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@mahout.apache.org by Anna Lahoud <an...@gmail.com> on 2012/08/17 19:09:10 UTC

Re: More mappers in RowId

I have spent some time trying out the RowID mod that Dan posted. But I am
not seeing any mappers run to produce the output. It does produce multiple
outputs, but the work appears to be done locally.

Is there another piece to the puzzle that I need to distribute this job to
the Hadoop nodes as a map task?


On Tue, Jun 5, 2012 at 11:13 AM, Pat Ferrel <pa...@occamsmachete.com> wrote:

> RowID+RowSimilarity are by far the slowest of the stops in my particular
> pipeline, which includes crawl, seq2sparse, and cluster.
>
> What is the status of more parallelization of these? Dan offers a mod. Has
> this been tested with rowsimilarity?
>
> I'd be happy to give it a try but am having trouble with clustering in the
> trunk right now.
>
> On 6/4/12 4:20 AM, DAN HELM wrote:
>
>>   I also developed a quick and dirty rowid mod so that it generates one
>> martrix output file for each "part" input file from seq2sparse command --
>> so multiple mappers are used to process output from rowid (such as when
>> running CVB clustering on rowid output).  For that I just made a derivative
>> rowid command called rowiddist.  So the final solution should be logic
>> rolled into a single rowid command, where the run type (multiple file
>> output vs single matrix file) is driven by parameters.  The code below was
>> just a quick "solution".
>>   **** code follows ****
>> package home.mahout;
>> import com.google.common.io.**Closeables;
>> import org.apache.hadoop.conf.**Configuration;
>> import org.apache.hadoop.fs.**FileStatus;
>> import org.apache.hadoop.fs.**FileSystem;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.io.**IntWritable;
>> import org.apache.hadoop.io.**SequenceFile;
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.util.**ToolRunner;
>> import org.apache.mahout.common.**AbstractJob;
>> import org.apache.mahout.common.Pair;
>> import org.apache.mahout.common.**iterator.sequencefile.**PathFilters;
>> import org.apache.mahout.common.**iterator.sequencefile.**PathType;
>> import org.apache.mahout.common.**iterator.sequencefile.**
>> SequenceFileDirIterable;
>> import org.apache.mahout.math.**VectorWritable;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> import java.util.Map;
>>   public class RowIdJobDistributed extends AbstractJob {
>>    private static final Logger log = LoggerFactory.getLogger(**
>> RowIdJobDistributed.class);
>>    @Override
>>    public int run(String[] args) throws Exception {
>>      addInputOption();
>>      addOutputOption();
>>      Map<String,String>  parsedArgs = parseArguments(args);
>>      if (parsedArgs == null) {
>>        return -1;
>>      }
>>      Configuration conf = getConf();
>>      FileSystem fs = FileSystem.get(conf);
>>      Path outputPath = getOutputPath();
>>      Path indexPath = new Path(outputPath, "docIndex");
>>      SequenceFile.Writer indexWriter = SequenceFile.createWriter(fs,
>>                                                                  conf,
>>
>>  indexPath,
>>
>>  IntWritable.class,
>>
>>  Text.class);
>>      FileStatus fstatus[] = fs.listStatus(getInputPath());
>>        try {
>>        IntWritable docId = new IntWritable();
>>        int i = 0;
>>        int numCols = 0;
>>        int matrixCount = 0;
>>              for ( FileStatus f: fstatus ) {
>>         if (f.isDir()) {
>>        continue;
>>       }            Path inPath = new Path(f.getPath().toUri().**
>> toString());
>>       Path matrixPath = new Path(outputPath, "matrix-"+matrixCount);
>>             SequenceFile.Writer matrixWriter =
>> SequenceFile.createWriter(fs,
>>
>> conf,
>>
>> matrixPath,
>>
>> IntWritable.class,
>>
>> VectorWritable.class);
>>                for (Pair<Text,VectorWritable>  record :
>>               new SequenceFileDirIterable<Text,**VectorWritable>(inPath,
>>
>>  PathType.LIST,
>>
>>  PathFilters.logsCRCFilter(),
>>                                                                null,
>>                                                                true,
>>                                                                conf)) {
>>            VectorWritable value = record.getSecond();
>>            docId.set(i);
>>            indexWriter.append(docId, record.getFirst());
>>            matrixWriter.append(docId, value);
>>            i++;
>>            numCols = value.get().size();
>>          }
>>                  Closeables.closeQuietly(**matrixWriter);
>>          matrixCount++;
>>                }
>>        log.info("Wrote out matrix with {} rows and {} columns to {}",
>> new Object[] { i, numCols, outputPath });
>>        return 0;
>>      } finally {
>>        Closeables.closeQuietly(**indexWriter);
>>      }
>>    }
>>    public static void main(String[] args) throws Exception {
>>      ToolRunner.run(new RowIdJobDistributed(), args);
>>    }
>> }
>>
>>
>>
>> ______________________________**__
>>   From: Suneel Marthi<suneel_marthi@yahoo.com**>
>> To: "user@mahout.apache.org"<us...@mahout.apache.org>
>> >
>> Sent: Monday, June 4, 2012 1:39 AM
>> Subject: Questions on RowId job
>>
>>
>> 3. There was an issue that was brought up on these forums sometime last
>> week to which Jake's response was that it would be a welcome patch to be
>> able to configure the RowId Job to run in a distributed mode with many
>> mappers. I agree that this is a useful change. This change means that the
>> RowSimilarity Job which takes as inputs, the matrix and the number of
>> columns generated by the RowId job needs to be tested to ensure that
>> nothing's broken.
>>
>> Should I go ahead and open a Jira for the above issues?
>>
>>
>> Regards,
>> Suneel
>>
>

Re: More mappers in RowId

Posted by Anna Lahoud <an...@gmail.com>.
Thank you Dan. I understand that having multiple rowid files for the next
step(s) is the key. In my testing though, no matter how I slice it up, I am
unable to get the rowsimilarity job to complete on my inputs. It easily
completes the first phase of the job with my multiple (100) matrix files.
But the second phase of row similarity begins to slow until the Hadoop
timeout is issued for one or more mappers and the entire job fails. I was
even able to get the mappers to report 100% on my latest down-sized job.
But then the logs show that the merge-phases of the mappers begin to stall
and eventually they also fail to report in long enough for the entire map
to fail. At that point, the job is returned to the job tracker for
reassignment and the job status falls back to a partially completed job to
reassign that mapper's work to another mapper. The rest is a series of
similar failures until the job is reported as a total failure and returns.

I have been downsizing my job for a week and am wondering if anyone can
give me any guidance as to how to set up a job for success. I'm curious if
there are parameters that we know correspond to the numbers of comparisons,
the maximum cardinality and records to use in a job, or the size of memory
used, or settings that will assist in creating a successful job. I set the
latest job up to exclude self comparison, use a threshold of 0.75, and use
a maximum number of similarities per row of 10.

Once the first mapper failed during the merge pass, the other successful
mappers eventually fail as well, perhaps waiting to combine data with the
failed mapper.

Any guidance at all would be greatly appreciated. I just want to find that
'threshold' of performance where I can task it but it will succeed. Thank
you.

Anna

On Fri, Aug 17, 2012 at 5:21 PM, DAN HELM <da...@verizon.net> wrote:

> Hi Anna,
>
> When I wrote that mod if was specifically so that rowid would output
> multiple files (instead of one Matrix file).  This was done so the LDAclustering I was using would run
> multiple mappers.  The rowid code (including the mod) still just runs
> locally.  I have not done much with it as of late but had modified the
> code below to support another parameter (-m) enabling one to split the
> data up even more by max number of vectors per output file so that lots of
> mappers could run for a program like LDA.  Setting -m to -1 makes rowidwork the original way (one output Matrix file).
>
> Dan
>
>    *From:* Anna Lahoud <an...@gmail.com>
> *To:* user@mahout.apache.org
> *Sent:* Friday, August 17, 2012 1:09 PM
> *Subject:* Re: More mappers in RowId
>
> I have spent some time trying out the RowID mod that Dan posted. But I am
> not seeing any mappers run to produce the output. It does produce multiple
> outputs, but the work appears to be done locally.
>
> Is there another piece to the puzzle that I need to distribute this job to
> the Hadoop nodes as a map task?
>
>
> On Tue, Jun 5, 2012 at 11:13 AM, Pat Ferrel <pa...@occamsmachete.com> wrote:
>
> > RowID+RowSimilarity are by far the slowest of the stops in my particular
> > pipeline, which includes crawl, seq2sparse, and cluster.
> >
> > What is the status of more parallelization of these? Dan offers a mod.
> Has
> > this been tested with rowsimilarity?
> >
> > I'd be happy to give it a try but am having trouble with clustering in
> the
> > trunk right now.
> >
> > On 6/4/12 4:20 AM, DAN HELM wrote:
> >
> >>  I also developed a quick and dirty rowid mod so that it generates one
> >> martrix output file for each "part" input file from seq2sparse command
> --
> >> so multiple mappers are used to process output from rowid (such as when
> >> running CVB clustering on rowid output).  For that I just made a
> derivative
> >> rowid command called rowiddist.  So the final solution should be logic
> >> rolled into a single rowid command, where the run type (multiple file
> >> output vs single matrix file) is driven by parameters.  The code below
> was
> >> just a quick "solution".
> >>  **** code follows ****
> >> package home.mahout;
> >> import com.google.common.io.**Closeables;
> >> import org.apache.hadoop.conf.**Configuration;
> >> import org.apache.hadoop.fs.**FileStatus;
> >> import org.apache.hadoop.fs.**FileSystem;
> >> import org.apache.hadoop.fs.Path;
> >> import org.apache.hadoop.io.**IntWritable;
> >> import org.apache.hadoop.io.**SequenceFile;
> >> import org.apache.hadoop.io.Text;
> >> import org.apache.hadoop.util.**ToolRunner;
> >> import org.apache.mahout.common.**AbstractJob;
> >> import org.apache.mahout.common.Pair;
> >> import org.apache.mahout.common.**iterator.sequencefile.**PathFilters;
> >> import org.apache.mahout.common.**iterator.sequencefile.**PathType;
> >> import org.apache.mahout.common.**iterator.sequencefile.**
> >> SequenceFileDirIterable;
> >> import org.apache.mahout.math.**VectorWritable;
>
> >> import org.slf4j.Logger;
> >> import org.slf4j.LoggerFactory;
> >> import java.util.Map;
> >>  public class RowIdJobDistributed extends AbstractJob {
> >>    private static final Logger log = LoggerFactory.getLogger(**
>
> >> RowIdJobDistributed.class);
> >>    @Override
> >>    public int run(String[] args) throws Exception {
> >>      addInputOption();
> >>      addOutputOption();
> >>      Map<String,String>  parsedArgs = parseArguments(args);
> >>      if (parsedArgs == null) {
> >>        return -1;
> >>      }
> >>      Configuration conf = getConf();
> >>      FileSystem fs = FileSystem.get(conf);
> >>      Path outputPath = getOutputPath();
> >>      Path indexPath = new Path(outputPath, "docIndex");
> >>      SequenceFile.Writer indexWriter = SequenceFile.createWriter(fs,
> >>                                                                  conf,
> >>
> >>  indexPath,
> >>
> >>  IntWritable.class,
> >>
> >>  Text.class);
> >>      FileStatus fstatus[] = fs.listStatus(getInputPath());
> >>        try {
> >>        IntWritable docId = new IntWritable();
> >>        int i = 0;
> >>        int numCols = 0;
> >>        int matrixCount = 0;
> >>              for ( FileStatus f: fstatus ) {
> >>        if (f.isDir()) {
> >>        continue;
> >>      }            Path inPath = new Path(f.getPath().toUri().**
>
> >> toString());
> >>      Path matrixPath = new Path(outputPath, "matrix-"+matrixCount);
> >>            SequenceFile.Writer matrixWriter =
> >> SequenceFile.createWriter(fs,
> >>
> >> conf,
> >>
> >> matrixPath,
> >>
> >> IntWritable.class,
> >>
> >> VectorWritable.class);
> >>                for (Pair<Text,VectorWritable>  record :
> >>              new SequenceFileDirIterable<Text,**VectorWritable>(inPath,
>
> >>
> >>  PathType.LIST,
> >>
> >>  PathFilters.logsCRCFilter(),
> >>                                                                null,
> >>                                                                true,
> >>                                                                conf)) {
> >>            VectorWritable value = record.getSecond();
> >>            docId.set(i);
> >>            indexWriter.append(docId, record.getFirst());
> >>            matrixWriter.append(docId, value);
> >>            i++;
> >>            numCols = value.get().size();
> >>          }
> >>                  Closeables.closeQuietly(**matrixWriter);
>
> >>          matrixCount++;
> >>                }
> >>        log.info("Wrote out matrix with {} rows and {} columns to {}",
> >> new Object[] { i, numCols, outputPath });
> >>        return 0;
> >>      } finally {
> >>        Closeables.closeQuietly(**indexWriter);
>
> >>      }
> >>    }
> >>    public static void main(String[] args) throws Exception {
> >>      ToolRunner.run(new RowIdJobDistributed(), args);
> >>    }
> >> }
> >>
> >>
> >>
> >> ______________________________**__
> >>  From: Suneel Marthi<suneel_marthi@yahoo.com**>
> >> To: "user@mahout.apache.org"<user@**mahout.apache.org<
> user@mahout.apache.org>
>
> >> >
> >> Sent: Monday, June 4, 2012 1:39 AM
> >> Subject: Questions on RowId job
> >>
> >>
> >> 3. There was an issue that was brought up on these forums sometime last
> >> week to which Jake's response was that it would be a welcome patch to be
> >> able to configure the RowId Job to run in a distributed mode with many
> >> mappers. I agree that this is a useful change. This change means that
> the
> >> RowSimilarity Job which takes as inputs, the matrix and the number of
> >> columns generated by the RowId job needs to be tested to ensure that
> >> nothing's broken.
> >>
> >> Should I go ahead and open a Jira for the above issues?
> >>
> >>
> >> Regards,
> >> Suneel
> >>
> >
>
>
>
>

Re: More mappers in RowId

Posted by DAN HELM <da...@verizon.net>.
Hi Anna,
 
When I wrote that mod if was specifically so that rowid would output multiple files (instead of one Matrix file).  This was done so the LDA clustering I was using would run multiple mappers.  The rowid code (including the mod) still just runs locally.  I have not done much with it as of late but had modified the code below to support another parameter (-m) enabling one to split the data up even more by max number of vectors per output file so that lots of mappers could run for a program like LDA.  Setting -m to -1 makes rowid work the original way (one output Matrix file).
 
Dan
 

________________________________
 From: Anna Lahoud <an...@gmail.com>
To: user@mahout.apache.org 
Sent: Friday, August 17, 2012 1:09 PM
Subject: Re: More mappers in RowId
  
I have spent some time trying out the RowID mod that Dan posted. But I am
not seeing any mappers run to produce the output. It does produce multiple
outputs, but the work appears to be done locally.

Is there another piece to the puzzle that I need to distribute this job to
the Hadoop nodes as a map task?


On Tue, Jun 5, 2012 at 11:13 AM, Pat Ferrel <pa...@occamsmachete.com> wrote:

> RowID+RowSimilarity are by far the slowest of the stops in my particular
> pipeline, which includes crawl, seq2sparse, and cluster.
>
> What is the status of more parallelization of these? Dan offers a mod. Has
> this been tested with rowsimilarity?
>
> I'd be happy to give it a try but am having trouble with clustering in the
> trunk right now.
>
> On 6/4/12 4:20 AM, DAN HELM wrote:
>
>>   I also developed a quick and dirty rowid mod so that it generates one
>> martrix output file for each "part" input file from seq2sparse command --
>> so multiple mappers are used to process output from rowid (such as when
>> running CVB clustering on rowid output).  For that I just made a derivative
>> rowid command called rowiddist.  So the final solution should be logic
>> rolled into a single rowid command, where the run type (multiple file
>> output vs single matrix file) is driven by parameters.  The code below was
>> just a quick "solution".
>>   **** code follows ****
>> package home.mahout;
>> import com.google.common.io.**Closeables;
>> import org.apache.hadoop.conf.**Configuration;
>> import org.apache.hadoop.fs.**FileStatus;
>> import org.apache.hadoop.fs.**FileSystem;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.io.**IntWritable;
>> import org.apache.hadoop.io.**SequenceFile;
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.util.**ToolRunner;
>> import org.apache.mahout.common.**AbstractJob;
>> import org.apache.mahout.common.Pair;
>> import org.apache.mahout.common.**iterator.sequencefile.**PathFilters;
>> import org.apache.mahout.common.**iterator.sequencefile.**PathType;
>> import org.apache.mahout.common.**iterator.sequencefile.**
>> SequenceFileDirIterable;
>> import org.apache.mahout.math.**VectorWritable;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> import java.util.Map;
>>   public class RowIdJobDistributed extends AbstractJob {
>>    private static final Logger log = LoggerFactory.getLogger(**
>> RowIdJobDistributed.class);
>>    @Override
>>    public int run(String[] args) throws Exception {
>>      addInputOption();
>>      addOutputOption();
>>      Map<String,String>  parsedArgs = parseArguments(args);
>>      if (parsedArgs == null) {
>>        return -1;
>>      }
>>      Configuration conf = getConf();
>>      FileSystem fs = FileSystem.get(conf);
>>      Path outputPath = getOutputPath();
>>      Path indexPath = new Path(outputPath, "docIndex");
>>      SequenceFile.Writer indexWriter = SequenceFile.createWriter(fs,
>>                                                                  conf,
>>
>>  indexPath,
>>
>>  IntWritable.class,
>>
>>  Text.class);
>>      FileStatus fstatus[] = fs.listStatus(getInputPath());
>>        try {
>>        IntWritable docId = new IntWritable();
>>        int i = 0;
>>        int numCols = 0;
>>        int matrixCount = 0;
>>              for ( FileStatus f: fstatus ) {
>>         if (f.isDir()) {
>>        continue;
>>       }            Path inPath = new Path(f.getPath().toUri().**
>> toString());
>>       Path matrixPath = new Path(outputPath, "matrix-"+matrixCount);
>>             SequenceFile.Writer matrixWriter =
>> SequenceFile.createWriter(fs,
>>
>> conf,
>>
>> matrixPath,
>>
>> IntWritable.class,
>>
>> VectorWritable.class);
>>                for (Pair<Text,VectorWritable>  record :
>>               new SequenceFileDirIterable<Text,**VectorWritable>(inPath,
>>
>>  PathType.LIST,
>>
>>  PathFilters.logsCRCFilter(),
>>                                                                null,
>>                                                                true,
>>                                                                conf)) {
>>            VectorWritable value = record.getSecond();
>>            docId.set(i);
>>            indexWriter.append(docId, record.getFirst());
>>            matrixWriter.append(docId, value);
>>            i++;
>>            numCols = value.get().size();
>>          }
>>                  Closeables.closeQuietly(**matrixWriter);
>>          matrixCount++;
>>                }
>>        log.info("Wrote out matrix with {} rows and {} columns to {}",
>> new Object[] { i, numCols, outputPath });
>>        return 0;
>>      } finally {
>>        Closeables.closeQuietly(**indexWriter);
>>      }
>>    }
>>    public static void main(String[] args) throws Exception {
>>      ToolRunner.run(new RowIdJobDistributed(), args);
>>    }
>> }
>>
>>
>>
>> ______________________________**__
>>   From: Suneel Marthi<suneel_marthi@yahoo.com**>
>> To: "user@mahout.apache.org"<us...@mahout.apache.org>
>> >
>> Sent: Monday, June 4, 2012 1:39 AM
>> Subject: Questions on RowId job
>>
>>
>> 3. There was an issue that was brought up on these forums sometime last
>> week to which Jake's response was that it would be a welcome patch to be
>> able to configure the RowId Job to run in a distributed mode with many
>> mappers. I agree that this is a useful change. This change means that the
>> RowSimilarity Job which takes as inputs, the matrix and the number of
>> columns generated by the RowId job needs to be tested to ensure that
>> nothing's broken.
>>
>> Should I go ahead and open a Jira for the above issues?
>>
>>
>> Regards,
>> Suneel
>>
>