You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mahout.apache.org by DAN HELM <da...@verizon.net> on 2012/07/21 03:32:46 UTC

rowid modification...

Hello,
 
I have been using Mahout for last couple of months, primarily focusing on document clustering (CVB in particular) and have gotten great adice on user formum (Jake, Andy, etc).
 
I was using rowid to change the keys in our sparse vectors (initially generated by seq2sparse) from Text to Integer as required by the CVB algorithm.  I did encounter a problem with the current rowid program, where it would generate one file resulting in only one mapper running for CVB.  I created a derivative rowid that takes new parameter "-m" enabling one to specify how many vectors (max) to write to a part file, resulting in multiple files and mappers for CVB in this case.
 
I recall reading on the Mahout development forum guidelines that one should first discuss with relevant forum members their thoughts on potential contributions.  It was not a lot of code and I am including it below.  I am aware that contributed software should be thoroughly tested, include documentation, junit tests, etc. but am curious if this kind of mod (including with further revisions/enhancements) would be beneficial for contribution.  
 
Thanks, Dan
 
--------------------
 
package home.mahout;
import com.google.common.io.Closeables;
import org.apache.commons.cli2.CommandLine;
import org.apache.commons.cli2.Group;
import org.apache.commons.cli2.Option;
import org.apache.commons.cli2.builder.ArgumentBuilder;
import org.apache.commons.cli2.builder.DefaultOptionBuilder;
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class RowIdJobDistributed extends AbstractJob {
  private static final Logger log = LoggerFactory.getLogger(RowIdJobDistributed.class);
  @Override
  public int run(String[] args) throws Exception {
    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
    ArgumentBuilder abuilder = new ArgumentBuilder();
    GroupBuilder gbuilder = new GroupBuilder(); 
    
    Option inputDirOpt = DefaultOptionCreator.inputOption().create();
    Option outputDirOpt = DefaultOptionCreator.outputOption().create();
    
    Option maxRecordsOpt = obuilder.withLongName("maxRecords").withArgument(
      abuilder.withName("maxRecords").withMinimum(1).withMaximum(1).create()).withDescription(
      "(Optional) Maximum number of records to write to an output \"matrix\" file. Default Value: -1").withShortName("m").create();
    
    Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h")
        .create();
    
    Group group = gbuilder.withName("Options").withOption(maxRecordsOpt)
        .withOption(outputDirOpt).withOption(inputDirOpt)
        .withOption(helpOpt)
        .create();
    
    SequenceFile.Writer matrixWriter = null;
    SequenceFile.Writer indexWriter = null;
    
    try {
      Parser parser = new Parser();
      parser.setGroup(group);
      parser.setHelpOption(helpOpt);
      CommandLine cmdLine = parser.parse(args);
      
      if (cmdLine.hasOption(helpOpt)) {
        CommandLineUtil.printHelp(group);
        return -1;
      }    
      
      Path inputPath = new Path((String) cmdLine.getValue(inputDirOpt));
      Path outputPath = new Path((String) cmdLine.getValue(outputDirOpt));
      
      int maxRecords = -1; // default means to just write out one "matrix" file as does orginial RowIdJob program
      if (cmdLine.hasOption(maxRecordsOpt)) {
        String maxRecordsString = (String) cmdLine.getValue(maxRecordsOpt);
        maxRecords = Integer.parseInt(maxRecordsString);
      }
  
      Configuration conf = getConf();
      FileSystem fs = FileSystem.get(conf);
  
      Path indexPath = new Path(outputPath, "docIndex");
  
      indexWriter = SequenceFile.createWriter(fs,
                                              conf,
                                              indexPath,
                                              IntWritable.class,
                                              Text.class);
      FileStatus fstatus[] = fs.listStatus(inputPath); 
      IntWritable docId = new IntWritable();
      int numCols = 0;
      int matrixCount = 0;
      Path matrixPath = null;
      int totalRecords = 0;
      
      for ( FileStatus f: fstatus ) { 
        if (f.isDir()) {
          continue;
        }    
        int i = 0;
        Path inPath = new Path(f.getPath().toUri().toString());
        if (!inPath.toString().contains("part-")) {
          continue;
        }
     
        for (Pair<Text,VectorWritable> record :
             new SequenceFileDirIterable<Text,VectorWritable>(inPath,
                                                              PathType.LIST,
                                                              PathFilters.logsCRCFilter(),
                                                              null,
                                                              true,
                                                              conf)) {  
          if ((maxRecords == -1 && matrixCount == 0) || (maxRecords != -1 && (i % maxRecords) == 0)) {
            if (matrixWriter != null) {
              Closeables.closeQuietly(matrixWriter);
            }
            matrixPath = new Path(outputPath, String.format("part-%05d",matrixCount));
        
            matrixWriter = SequenceFile.createWriter(fs,
                                                     conf,
                                                     matrixPath,
                                                     IntWritable.class,
                                                     VectorWritable.class);
            matrixCount++;
          }          
          
          VectorWritable value = record.getSecond();
          docId.set(totalRecords);
          indexWriter.append(docId, record.getFirst());
          matrixWriter.append(docId, value);
          //log.info("Wrote record " + i + " for input file " + inPath + " to output file " + matrixPath);
          i++;
          totalRecords++;
          numCols = value.get().size();
        }
      }
      
      log.info("Wrote out matrix with {} rows and {} columns to {}", new Object[] { totalRecords, numCols, outputPath });
      return 0;
    } finally {
      Closeables.closeQuietly(matrixWriter);   
      Closeables.closeQuietly(indexWriter);
    }
  }
  public static void main(String[] args) throws Exception {
    ToolRunner.run(new RowIdJobDistributed(), args);
  }
}