You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2011/08/23 11:13:03 UTC

svn commit: r1160591 - /mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java

Author: ssc
Date: Tue Aug 23 09:13:03 2011
New Revision: 1160591

URL: http://svn.apache.org/viewvc?rev=1160591&view=rev
Log:
MAHOUT-777 Improve TransposeJob to use a Combiner

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java?rev=1160591&r1=1160590&r2=1160591&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java Tue Aug 23 09:13:03 2011
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -42,10 +43,8 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 
-/**
- * TODO: rewrite to use helpful combiner.
- */
 public class TransposeJob extends AbstractJob {
+
   public static final String NUM_ROWS_KEY = "SparseRowMatrix.numRows";
 
   public static void main(String[] args) throws Exception {
@@ -59,16 +58,13 @@ public class TransposeJob extends Abstra
     addOption("numCols", "nc", "Number of columns of the input matrix");
     Map<String,String> parsedArgs = parseArguments(strings);
     if (parsedArgs == null) {
-      // FIXME
-      return 0;
+      return -1;
     }
 
-    Path inputPath = getInputPath();
-    Path outputTmpPath = new Path(parsedArgs.get("--tempDir"));
     int numRows = Integer.parseInt(parsedArgs.get("--numRows"));
     int numCols = Integer.parseInt(parsedArgs.get("--numCols"));
 
-    DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath, outputTmpPath, numRows, numCols);
+    DistributedRowMatrix matrix = new DistributedRowMatrix(getInputPath(), getTempPath(), numRows, numCols);
     matrix.setConf(new Configuration(getConf()));
     matrix.transpose();
 
@@ -96,9 +92,10 @@ public class TransposeJob extends Abstra
     conf.setInputFormat(SequenceFileInputFormat.class);
     FileOutputFormat.setOutputPath(conf, matrixOutputPath);
     conf.setMapperClass(TransposeMapper.class);
-    conf.setReducerClass(TransposeReducer.class);
     conf.setMapOutputKeyClass(IntWritable.class);
-    conf.setMapOutputValueClass(DistributedRowMatrix.MatrixEntryWritable.class);
+    conf.setMapOutputValueClass(VectorWritable.class);
+    conf.setCombinerClass(MergeVectorsCombiner.class);
+    conf.setReducerClass(MergeVectorsReducer.class);
     conf.setOutputFormat(SequenceFileOutputFormat.class);
     conf.setOutputKeyClass(IntWritable.class);
     conf.setOutputValueClass(VectorWritable.class);
@@ -106,49 +103,62 @@ public class TransposeJob extends Abstra
   }
 
   public static class TransposeMapper extends MapReduceBase
-      implements Mapper<IntWritable,VectorWritable,IntWritable,DistributedRowMatrix.MatrixEntryWritable> {
+      implements Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+    private int newNumCols;
 
     @Override
-    public void map(IntWritable r,
-                    VectorWritable v,
-                    OutputCollector<IntWritable, DistributedRowMatrix.MatrixEntryWritable> out,
-                    Reporter reporter) throws IOException {
-      DistributedRowMatrix.MatrixEntryWritable entry = new DistributedRowMatrix.MatrixEntryWritable();
-      Iterator<Vector.Element> it = v.get().iterateNonZero();
+    public void configure(JobConf conf) {
+      newNumCols = conf.getInt(NUM_ROWS_KEY, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public void map(IntWritable r, VectorWritable v, OutputCollector<IntWritable, VectorWritable> out,
+        Reporter reporter) throws IOException {
       int row = r.get();
-      entry.setCol(row);
-      entry.setRow(-1);  // output "row" is captured in the key
+      Iterator<Vector.Element> it = v.get().iterateNonZero();
       while (it.hasNext()) {
         Vector.Element e = it.next();
+        RandomAccessSparseVector tmp = new RandomAccessSparseVector(newNumCols, 1);
+        tmp.setQuick(row, e.get());
         r.set(e.index());
-        entry.setVal(e.get());
-        out.collect(r, entry);
+        out.collect(r, new VectorWritable(tmp));
       }
     }
   }
 
-  public static class TransposeReducer extends MapReduceBase
-      implements Reducer<IntWritable,DistributedRowMatrix.MatrixEntryWritable,IntWritable,VectorWritable> {
+  static Vector merge(Iterator<VectorWritable> vectors) {
+    Vector accumulator = vectors.next().get();
+    while (vectors.hasNext()) {
+      VectorWritable v = vectors.next();
+      if (v != null) {
+        Iterator<Vector.Element> nonZeroElements = v.get().iterateNonZero();
+        while (nonZeroElements.hasNext()) {
+          Vector.Element nonZeroElement = nonZeroElements.next();
+          accumulator.setQuick(nonZeroElement.index(), nonZeroElement.get());
+        }
+      }
+    }
+    return accumulator;
+  }
 
-    private int newNumCols;
+  public static class MergeVectorsCombiner extends MapReduceBase
+        implements Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
 
     @Override
-    public void configure(JobConf conf) {
-      newNumCols = conf.getInt(NUM_ROWS_KEY, Integer.MAX_VALUE);
+    public void reduce(WritableComparable<?> key, Iterator<VectorWritable> vectors,
+        OutputCollector<WritableComparable<?>, VectorWritable> out, Reporter reporter) throws IOException {
+      out.collect(key, new VectorWritable(merge(vectors)));
     }
+  }
+
+  public static class MergeVectorsReducer extends MapReduceBase
+        implements Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
 
     @Override
-    public void reduce(IntWritable outRow,
-                       Iterator<DistributedRowMatrix.MatrixEntryWritable> it,
-                       OutputCollector<IntWritable, VectorWritable> out,
-                       Reporter reporter) throws IOException {
-      RandomAccessSparseVector tmp = new RandomAccessSparseVector(newNumCols, 100);
-      while (it.hasNext()) {
-        DistributedRowMatrix.MatrixEntryWritable e = it.next();
-        tmp.setQuick(e.getCol(), e.getVal());
-      }
-      SequentialAccessSparseVector outVector = new SequentialAccessSparseVector(tmp);
-      out.collect(outRow, new VectorWritable(outVector));
+    public void reduce(WritableComparable<?> key, Iterator<VectorWritable> vectors,
+        OutputCollector<WritableComparable<?>, VectorWritable> out, Reporter reporter) throws IOException {
+      out.collect(key, new VectorWritable(new SequentialAccessSparseVector(merge(vectors))));
     }
   }
 }