You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mahout.apache.org by DAN HELM <da...@verizon.net> on 2012/07/21 03:32:46 UTC
rowid modification...
Hello,
I have been using Mahout for last couple of months, primarily focusing on document clustering (CVB in particular) and have gotten great adice on user formum (Jake, Andy, etc).
I was using rowid to change the keys in our sparse vectors (initially generated by seq2sparse) from Text to Integer as required by the CVB algorithm. I did encounter a problem with the current rowid program, where it would generate one file resulting in only one mapper running for CVB. I created a derivative rowid that takes new parameter "-m" enabling one to specify how many vectors (max) to write to a part file, resulting in multiple files and mappers for CVB in this case.
I recall reading on the Mahout development forum guidelines that one should first discuss with relevant forum members their thoughts on potential contributions. It was not a lot of code and I am including it below. I am aware that contributed software should be thoroughly tested, include documentation, junit tests, etc. but am curious if this kind of mod (including with further revisions/enhancements) would be beneficial for contribution.
Thanks, Dan
--------------------
package home.mahout;
import com.google.common.io.Closeables;
import org.apache.commons.cli2.CommandLine;
import org.apache.commons.cli2.Group;
import org.apache.commons.cli2.Option;
import org.apache.commons.cli2.builder.ArgumentBuilder;
import org.apache.commons.cli2.builder.DefaultOptionBuilder;
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class RowIdJobDistributed extends AbstractJob {
private static final Logger log = LoggerFactory.getLogger(RowIdJobDistributed.class);
@Override
public int run(String[] args) throws Exception {
DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
ArgumentBuilder abuilder = new ArgumentBuilder();
GroupBuilder gbuilder = new GroupBuilder();
Option inputDirOpt = DefaultOptionCreator.inputOption().create();
Option outputDirOpt = DefaultOptionCreator.outputOption().create();
Option maxRecordsOpt = obuilder.withLongName("maxRecords").withArgument(
abuilder.withName("maxRecords").withMinimum(1).withMaximum(1).create()).withDescription(
"(Optional) Maximum number of records to write to an output \"matrix\" file. Default Value: -1").withShortName("m").create();
Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h")
.create();
Group group = gbuilder.withName("Options").withOption(maxRecordsOpt)
.withOption(outputDirOpt).withOption(inputDirOpt)
.withOption(helpOpt)
.create();
SequenceFile.Writer matrixWriter = null;
SequenceFile.Writer indexWriter = null;
try {
Parser parser = new Parser();
parser.setGroup(group);
parser.setHelpOption(helpOpt);
CommandLine cmdLine = parser.parse(args);
if (cmdLine.hasOption(helpOpt)) {
CommandLineUtil.printHelp(group);
return -1;
}
Path inputPath = new Path((String) cmdLine.getValue(inputDirOpt));
Path outputPath = new Path((String) cmdLine.getValue(outputDirOpt));
int maxRecords = -1; // default means to just write out one "matrix" file as does orginial RowIdJob program
if (cmdLine.hasOption(maxRecordsOpt)) {
String maxRecordsString = (String) cmdLine.getValue(maxRecordsOpt);
maxRecords = Integer.parseInt(maxRecordsString);
}
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
Path indexPath = new Path(outputPath, "docIndex");
indexWriter = SequenceFile.createWriter(fs,
conf,
indexPath,
IntWritable.class,
Text.class);
FileStatus fstatus[] = fs.listStatus(inputPath);
IntWritable docId = new IntWritable();
int numCols = 0;
int matrixCount = 0;
Path matrixPath = null;
int totalRecords = 0;
for ( FileStatus f: fstatus ) {
if (f.isDir()) {
continue;
}
int i = 0;
Path inPath = new Path(f.getPath().toUri().toString());
if (!inPath.toString().contains("part-")) {
continue;
}
for (Pair<Text,VectorWritable> record :
new SequenceFileDirIterable<Text,VectorWritable>(inPath,
PathType.LIST,
PathFilters.logsCRCFilter(),
null,
true,
conf)) {
if ((maxRecords == -1 && matrixCount == 0) || (maxRecords != -1 && (i % maxRecords) == 0)) {
if (matrixWriter != null) {
Closeables.closeQuietly(matrixWriter);
}
matrixPath = new Path(outputPath, String.format("part-%05d",matrixCount));
matrixWriter = SequenceFile.createWriter(fs,
conf,
matrixPath,
IntWritable.class,
VectorWritable.class);
matrixCount++;
}
VectorWritable value = record.getSecond();
docId.set(totalRecords);
indexWriter.append(docId, record.getFirst());
matrixWriter.append(docId, value);
//log.info("Wrote record " + i + " for input file " + inPath + " to output file " + matrixPath);
i++;
totalRecords++;
numCols = value.get().size();
}
}
log.info("Wrote out matrix with {} rows and {} columns to {}", new Object[] { totalRecords, numCols, outputPath });
return 0;
} finally {
Closeables.closeQuietly(matrixWriter);
Closeables.closeQuietly(indexWriter);
}
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new RowIdJobDistributed(), args);
}
}