You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2011/09/09 09:45:17 UTC

svn commit: r1167027 [1/2] - in /mahout/trunk: ./ core/src/main/java/org/apache/mahout/cf/taste/hadoop/ core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ core/src/main/java/org/...

Author: ssc
Date: Fri Sep  9 07:45:16 2011
New Revision: 1167027

URL: http://svn.apache.org/viewvc?rev=1167027&view=rev
Log:
MAHOUT-767 Improve RowSimilarityJob performance

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java
      - copied, changed from r1164967, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
      - copied, changed from r1151818, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java
      - copied, changed from r1151818, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/ClassUtils.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CityBlockSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CooccurrenceCountSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CosineSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CountbasedMeasure.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/EuclideanDistanceSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/LoglikelihoodSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/PearsonCorrelationSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/TanimotoCoefficientSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasure.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasures.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducerTest.java
      - copied, changed from r1164967, mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducerTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasuresTest.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/Cooccurrence.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/SimilarityMatrixEntryKey.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/SimilarityType.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrence.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedOccurrenceArray.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/WeightedRowPair.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapperTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/PredictionJobTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducerTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJobTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/graph/linkanalysis/PageRankJobTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/vector/
    mahout/trunk/integration/src/test/java/org/apache/mahout/utils/eval/ParallelFactorizationEvaluatorTest.java
    mahout/trunk/math/src/main/java/org/apache/mahout/math/AbstractVector.java
    mahout/trunk/pom.xml

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java Fri Sep  9 07:45:16 2011
@@ -17,8 +17,12 @@
 
 package org.apache.mahout.cf.taste.hadoop;
 
+import com.google.common.io.Closeables;
 import com.google.common.primitives.Longs;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
@@ -28,6 +32,7 @@ import org.apache.mahout.math.VarIntWrit
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.map.OpenIntLongHashMap;
 
+import java.io.IOException;
 import java.util.regex.Pattern;
 
 /**
@@ -38,8 +43,7 @@ public final class TasteHadoopUtils {
   /** Standard delimiter of textual preference data */
   private static final Pattern PREFERENCE_TOKEN_DELIMITER = Pattern.compile("[\t,]");
 
-  private TasteHadoopUtils() {
-  }
+  private TasteHadoopUtils() {}
 
   /**
    * Splits a preference data line into string tokens
@@ -73,4 +77,24 @@ public final class TasteHadoopUtils {
     return indexItemIDMap;
   }
 
+  public static void writeInt(int value, Path path, Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    FSDataOutputStream out = fs.create(path);
+    try {
+      out.writeInt(value);
+    } finally {
+      Closeables.closeQuietly(out);
+    }
+  }
+
+  public static int readInt(Path path, Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    FSDataInputStream in = fs.open(path);
+    try {
+      return in.readInt();
+    } finally {
+      Closeables.closeQuietly(in);
+    }
+  }
+
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java Fri Sep  9 07:45:16 2011
@@ -30,13 +30,15 @@ import java.util.regex.Pattern;
 public abstract class ToEntityPrefsMapper extends
     Mapper<LongWritable,Text, VarLongWritable,VarLongWritable> {
 
-  public static final String TRANSPOSE_USER_ITEM = "transposeUserItem";
+  public static final String TRANSPOSE_USER_ITEM = ToEntityPrefsMapper.class + "transposeUserItem";
+  public static final String RATING_SHIFT = ToEntityPrefsMapper.class + "shiftRatings";
 
   private static final Pattern DELIMITER = Pattern.compile("[\t,]");
 
   private boolean booleanData;
   private boolean transpose;
   private final boolean itemKey;
+  private float ratingShift;
 
   ToEntityPrefsMapper(boolean itemKey) {
     this.itemKey = itemKey;
@@ -47,6 +49,7 @@ public abstract class ToEntityPrefsMappe
     Configuration jobConf = context.getConfiguration();
     booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
     transpose = jobConf.getBoolean(TRANSPOSE_USER_ITEM, false);
+    ratingShift = Float.parseFloat(jobConf.get(RATING_SHIFT, String.valueOf(0f)));
   }
 
   @Override
@@ -67,7 +70,7 @@ public abstract class ToEntityPrefsMappe
     if (booleanData) {
       context.write(new VarLongWritable(userID), new VarLongWritable(itemID));
     } else {
-      float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) : 1.0f;
+      float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) + ratingShift : 1.0f;
       context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
     }
   }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Fri Sep  9 07:45:16 2011
@@ -19,7 +19,6 @@ package org.apache.mahout.cf.taste.hadoo
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -29,22 +28,17 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
-import org.apache.mahout.cf.taste.hadoop.MaybePruneRowsMapper;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
-import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.ToItemVectorsReducer;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.iterator.sequencefile.PathType;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
-import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.DistributedRowMatrix;
-import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob;
-import org.apache.mahout.math.hadoop.similarity.SimilarityType;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasures;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
@@ -71,7 +65,7 @@ import java.util.regex.Pattern;
  * <li>-Dmapred.input.dir=(path): Directory containing one or more text files with the preference data</li>
  * <li>-Dmapred.output.dir=(path): output path where recommender output should go</li>
  * <li>--similarityClassname (classname): Name of distributed similarity class to instantiate or a predefined similarity
- *  from {@link SimilarityType}</li>
+ *  from {@link org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasure}</li>
  * <li>--usersFile (path): only compute recommendations for user IDs contained in this file (optional)</li>
  * <li>--itemsFile (path): only include item IDs from this file in the recommendations (optional)</li>
  * <li>--filterFile (path): file containing comma-separated userID,itemID pairs. Used to exclude the item from the
@@ -94,11 +88,11 @@ public final class RecommenderJob extend
   public static final String BOOLEAN_DATA = "booleanData";
   
   private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;
-  private static final int DEFAULT_MAX_COOCCURRENCES_PER_ITEM = 100;
+  private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
   private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
 
   @Override
-  public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
+  public int run(String[] args) throws Exception {
 
     addInputOption();
     addOutputOption();
@@ -116,19 +110,17 @@ public final class RecommenderJob extend
         + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
     addOption("maxSimilaritiesPerItem", "m", "Maximum number of similarities considered per item ",
         String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ITEM));
-    addOption("maxCooccurrencesPerItem", "mo", "try to cap the number of cooccurrences per item to this "
-        + "number (default: " + DEFAULT_MAX_COOCCURRENCES_PER_ITEM + ')',
-        String.valueOf(DEFAULT_MAX_COOCCURRENCES_PER_ITEM));
-    addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate, alternatively use "
-        + "one of the predefined similarities (" + SimilarityType.listEnumNames() + ')',
-        String.valueOf(SimilarityType.SIMILARITY_COOCCURRENCE));    
+    addOption("maxPrefsPerUserInItemSimilarity", "mppuiis", "max number of preferences to consider per user in the " +
+        "item similarity computation phase, users with more preferences will be sampled down (default: " +
+        DEFAULT_MAX_PREFS_PER_USER + ")", String.valueOf(DEFAULT_MAX_PREFS_PER_USER));
+    addOption("similarityClassname", "s", "Name of distributed similarity measures class to instantiate, " +
+        "alternatively use one of the predefined similarities (" + VectorSimilarityMeasures.list() + ')');
 
     Map<String,String> parsedArgs = parseArguments(args);
     if (parsedArgs == null) {
       return -1;
     }
 
-    Path inputPath = getInputPath();
     Path outputPath = getOutputPath();
     int numRecommendations = Integer.parseInt(parsedArgs.get("--numRecommendations"));
     String usersFile = parsedArgs.get("--usersFile");
@@ -137,14 +129,11 @@ public final class RecommenderJob extend
     boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
     int maxPrefsPerUser = Integer.parseInt(parsedArgs.get("--maxPrefsPerUser"));
     int minPrefsPerUser = Integer.parseInt(parsedArgs.get("--minPrefsPerUser"));
+    int maxPrefsPerUserInItemSimilarity = Integer.parseInt(parsedArgs.get("--maxPrefsPerUserInItemSimilarity"));
     int maxSimilaritiesPerItem = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItem"));
-    int maxCooccurrencesPerItem = Integer.parseInt(parsedArgs.get("--maxCooccurrencesPerItem"));
     String similarityClassname = parsedArgs.get("--similarityClassname");
 
-    Path userVectorPath = getTempPath("userVectors");
-    Path itemIDIndexPath = getTempPath("itemIDIndex");
-    Path countUsersPath = getTempPath("countUsers");
-    Path itemUserMatrixPath = getTempPath("itemUserMatrix");
+    Path prepPath = getTempPath("preparePreferenceMatrix");
     Path similarityMatrixPath = getTempPath("similarityMatrix");
     Path prePartialMultiplyPath1 = getTempPath("prePartialMultiply1");
     Path prePartialMultiplyPath2 = getTempPath("prePartialMultiply2");
@@ -153,63 +142,38 @@ public final class RecommenderJob extend
 
     AtomicInteger currentPhase = new AtomicInteger();
 
-    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job itemIDIndex = prepareJob(
-        inputPath, itemIDIndexPath, TextInputFormat.class,
-        ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,
-        ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,
-        SequenceFileOutputFormat.class);
-      itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);
-      itemIDIndex.waitForCompletion(true);
-    }
-
     int numberOfUsers = -1;
+
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job toUserVector = prepareJob(
-        inputPath, userVectorPath, TextInputFormat.class,
-        ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
-        ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class,
-        SequenceFileOutputFormat.class);
-      toUserVector.getConfiguration().setBoolean(BOOLEAN_DATA, booleanData);
-      toUserVector.getConfiguration().setInt(ToUserVectorReducer.MIN_PREFERENCES_PER_USER, minPrefsPerUser);
-      toUserVector.waitForCompletion(true);
+      ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[] {
+          "--input", getInputPath().toString(),
+          "--output", prepPath.toString(),
+          "--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),
+          "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
+          "--booleanData", String.valueOf(booleanData),
+          "--tempDir", getTempPath().toString() });
 
-      numberOfUsers = (int) toUserVector.getCounters().findCounter(ToUserVectorReducer.Counters.USERS).getValue();
+      numberOfUsers = TasteHadoopUtils.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());
     }
 
-    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job maybePruneAndTransponse = prepareJob(userVectorPath,
-                                  itemUserMatrixPath,
-                                  SequenceFileInputFormat.class,
-                                  MaybePruneRowsMapper.class,
-                                  IntWritable.class,
-                                  DistributedRowMatrix.MatrixEntryWritable.class,
-                                  ToItemVectorsReducer.class,
-                                  IntWritable.class,
-                                  VectorWritable.class,
-                                  SequenceFileOutputFormat.class);
-      maybePruneAndTransponse.getConfiguration().setInt(MaybePruneRowsMapper.MAX_COOCCURRENCES,
-          maxCooccurrencesPerItem);
-      maybePruneAndTransponse.waitForCompletion(true);
-    }
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+
+      /* special behavior if phase 1 is skipped */
+      if (numberOfUsers == -1) {
+        numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
+            PathType.LIST, null, getConf());
+      }
+
       /* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like
        * new DistributedRowMatrix(...).rowSimilarity(...) */
-      try {
-        if (numberOfUsers == -1){
-           numberOfUsers = (int) HadoopUtil.countRecords(userVectorPath, PathType.LIST, null, getConf());
-        }
-        ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
-          "-Dmapred.input.dir=" + itemUserMatrixPath,
-          "-Dmapred.output.dir=" + similarityMatrixPath,
-          "--numberOfColumns", String.valueOf(numberOfUsers),
-          "--similarityClassname", similarityClassname,
-          "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem + 1),
-          "--tempDir", getTempPath().toString() });
-      } catch (Exception e) {
-        throw new IllegalStateException("item-item-similarity computation failed", e);
-      }
+      ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
+        "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
+        "--output", similarityMatrixPath.toString(),
+        "--numberOfColumns", String.valueOf(numberOfUsers),
+        "--similarityClassname", similarityClassname,
+        "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem + 1),
+        "--tempDir", getTempPath().toString() });
     }
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
@@ -220,10 +184,9 @@ public final class RecommenderJob extend
         SequenceFileOutputFormat.class);
       prePartialMultiply1.waitForCompletion(true);
 
-      Job prePartialMultiply2 = prepareJob(
-        userVectorPath, prePartialMultiplyPath2, SequenceFileInputFormat.class,
-        UserVectorSplitterMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
-        Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
+      Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
+          prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,
+          VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
         SequenceFileOutputFormat.class);
       if (usersFile != null) {
         prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
@@ -271,7 +234,8 @@ public final class RecommenderJob extend
         setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), partialMultiplyPath, explicitFilterPath);
       }
       setIOSort(aggregateAndRecommend);
-      aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath.toString());
+      aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,
+          new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
       aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
       aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);
       aggregateAndRecommend.waitForCompletion(true);

Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java (from r1164967, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java&r1=1164967&r2=1167027&rev=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java Fri Sep  9 07:45:16 2011
@@ -44,10 +44,10 @@ import org.apache.mahout.math.VectorWrit
  * {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.
  * </p>
  */
-public final class ToUserVectorReducer extends
+public final class ToUserVectorsReducer extends
     Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
 
-  public static final String MIN_PREFERENCES_PER_USER = ToUserVectorReducer.class.getName() +
+  public static final String MIN_PREFERENCES_PER_USER = ToUserVectorsReducer.class.getName() +
       ".minPreferencesPerUser";
 
   private int minPreferences;

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java?rev=1167027&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java Fri Sep  9 07:45:16 2011
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.preparation;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;
+import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
+import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexMapper;
+import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexReducer;
+import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
+import org.apache.mahout.cf.taste.hadoop.item.ToUserVectorsReducer;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.VectorWritable;
+
+import java.util.Map;
+
+public class PreparePreferenceMatrixJob extends AbstractJob {
+
+  public static final String NUM_USERS = "numUsers.bin";
+  public static final String ITEMID_INDEX = "itemIDIndex";
+  public static final String USER_VECTORS = "userVectors";
+  public static final String RATING_MATRIX = "ratingMatrix";
+
+  private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new PreparePreferenceMatrixJob(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    addInputOption();
+    addOutputOption();
+    addOption("maxPrefsPerUser", "mppu", "max number of preferences to consider per user, " +
+        "users with more preferences will be sampled down");
+    addOption("minPrefsPerUser", "mp", "ignore users with less preferences than this "
+        + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
+    addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString());
+    addOption("ratingShift", "rs", "shift ratings by this value", String.valueOf(0f));
+
+    Map<String,String> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+
+    int minPrefsPerUser = Integer.parseInt(parsedArgs.get("--minPrefsPerUser"));
+    boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
+    float ratingShift = Float.parseFloat(parsedArgs.get("--ratingShift"));
+
+    Job itemIDIndex = prepareJob(getInputPath(), getOutputPath(ITEMID_INDEX), TextInputFormat.class,
+        ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class, ItemIDIndexReducer.class,
+        VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class);
+    itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);
+    itemIDIndex.waitForCompletion(true);
+
+    Job toUserVectors = prepareJob(getInputPath(), getOutputPath(USER_VECTORS), TextInputFormat.class,
+        ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
+        ToUserVectorsReducer.class, VarLongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class);
+    toUserVectors.getConfiguration().setBoolean(RecommenderJob.BOOLEAN_DATA, booleanData);
+    toUserVectors.getConfiguration().setInt(ToUserVectorsReducer.MIN_PREFERENCES_PER_USER, minPrefsPerUser);
+    toUserVectors.getConfiguration().set(ToEntityPrefsMapper.RATING_SHIFT, String.valueOf(ratingShift));
+    toUserVectors.waitForCompletion(true);
+
+    int numberOfUsers = (int) toUserVectors.getCounters().findCounter(ToUserVectorsReducer.Counters.USERS).getValue();
+    TasteHadoopUtils.writeInt(numberOfUsers, getOutputPath(NUM_USERS), getConf());
+
+    Job toItemVectors = prepareJob(getOutputPath(USER_VECTORS), getOutputPath(RATING_MATRIX),
+        ToItemVectorsMapper.class, IntWritable.class, VectorWritable.class, ToItemVectorsReducer.class,
+        IntWritable.class, VectorWritable.class);
+    toItemVectors.setCombinerClass(ToItemVectorsReducer.class);
+
+    /* configure sampling regarding the uservectors */
+    if (parsedArgs.containsKey("--maxPrefsPerUser")) {
+      int samplingSize = Integer.parseInt(parsedArgs.get("--maxPrefsPerUser"));
+      toItemVectors.getConfiguration().setInt(ToItemVectorsMapper.SAMPLE_SIZE, samplingSize);
+    }
+
+    toItemVectors.waitForCompletion(true);
+
+    return 0;
+  }
+}

Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java (from r1151818, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java&r1=1151818&r2=1167027&rev=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java Fri Sep  9 07:45:16 2011
@@ -15,107 +15,58 @@
  * limitations under the License.
  */
 
-package org.apache.mahout.cf.taste.hadoop;
+package org.apache.mahout.cf.taste.hadoop.preparation;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.mahout.cf.taste.common.MinK;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.DistributedRowMatrix;
-import org.apache.mahout.math.map.OpenIntIntHashMap;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
 
 import java.io.IOException;
-import java.util.Comparator;
 import java.util.Iterator;
 
+public class ToItemVectorsMapper
+    extends Mapper<VarLongWritable,VectorWritable,IntWritable,VectorWritable> {
 
-/**
- * tries to limit the number of elements per col to a fixed size and transposes the input afterwards
- */
-public class MaybePruneRowsMapper
-    extends Mapper<VarLongWritable,VectorWritable,IntWritable,DistributedRowMatrix.MatrixEntryWritable> {
-
-  public static final String MAX_COOCCURRENCES = MaybePruneRowsMapper.class.getName() + ".maxCooccurrences";
-  
-  private int maxCooccurrences;
-  private final OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
+  public static final String SAMPLE_SIZE = ToItemVectorsMapper.class + ".sampleSize";
 
   enum Elements {
-    USED, NEGLECTED
+    USER_RATINGS_USED, USER_RATINGS_NEGLECTED
   }
 
+  private int sampleSize;
+
   @Override
   protected void setup(Context ctx) throws IOException, InterruptedException {
-    super.setup(ctx);
-    maxCooccurrences = ctx.getConfiguration().getInt(MAX_COOCCURRENCES, -1);
-    if (maxCooccurrences < 1) {
-      throw new IllegalStateException("Maximum number of cooccurrences was not correctly set!");
-    }
+    sampleSize = ctx.getConfiguration().getInt(SAMPLE_SIZE, Integer.MAX_VALUE);
   }
 
   @Override
   protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, Context ctx)
-    throws IOException, InterruptedException {
-    Vector vector = vectorWritable.get();
-    countSeen(vector);
-
-    int numElementsBeforePruning = vector.getNumNondefaultElements();
-    vector = maybePruneVector(vector);
-    int numElementsAfterPruning = vector.getNumNondefaultElements();
-
-    ctx.getCounter(Elements.USED).increment(numElementsAfterPruning);
-    ctx.getCounter(Elements.NEGLECTED).increment(numElementsBeforePruning - numElementsAfterPruning);
-
-    DistributedRowMatrix.MatrixEntryWritable entry = new DistributedRowMatrix.MatrixEntryWritable();
-    int colIndex = TasteHadoopUtils.idToIndex(rowIndex.get());
-    entry.setCol(colIndex);
-    Iterator<Vector.Element> iterator = vector.iterateNonZero();
+      throws IOException, InterruptedException {
+    Vector userRatings = vectorWritable.get();
+
+    int numElementsBeforeSampling = userRatings.getNumNondefaultElements();
+    userRatings = Vectors.maybeSample(userRatings, sampleSize);
+    int numElementsAfterSampling = userRatings.getNumNondefaultElements();
+
+    int column = TasteHadoopUtils.idToIndex(rowIndex.get());
+    VectorWritable itemVector = new VectorWritable(new RandomAccessSparseVector(Integer.MAX_VALUE, 1));
+    itemVector.setWritesLaxPrecision(true);
+
+    Iterator<Vector.Element> iterator = userRatings.iterateNonZero();
     while (iterator.hasNext()) {
       Vector.Element elem = iterator.next();
-      entry.setRow(elem.index());
-      entry.setVal(elem.get());
-      ctx.write(new IntWritable(elem.index()), entry);
+      itemVector.get().setQuick(column, elem.get());
+      ctx.write(new IntWritable(elem.index()), itemVector);
     }
-  }
 
-  private void countSeen(Vector vector) {
-    Iterator<Vector.Element> it = vector.iterateNonZero();
-    while (it.hasNext()) {
-      int index = it.next().index();
-      indexCounts.adjustOrPutValue(index, 1, 1);
-    }
+    ctx.getCounter(Elements.USER_RATINGS_USED).increment(numElementsAfterSampling);
+    ctx.getCounter(Elements.USER_RATINGS_NEGLECTED).increment(numElementsBeforeSampling - numElementsAfterSampling);
   }
 
-  private Vector maybePruneVector(Vector vector) {
-    if (vector.getNumNondefaultElements() <= maxCooccurrences) {
-      return vector;
-    }
-
-    MinK<Integer> smallCounts = new MinK<Integer>(maxCooccurrences, new Comparator<Integer>() {
-        @Override
-        public int compare(Integer one, Integer two) {
-          return one.compareTo(two);
-        }
-      });
-
-    Iterator<Vector.Element> it = vector.iterateNonZero();
-    while (it.hasNext()) {
-      int count = indexCounts.get(it.next().index());
-      smallCounts.offer(count);
-    }
-
-    int greatestSmallCount = smallCounts.greatestSmall();
-    if (greatestSmallCount > 0) {
-      Iterator<Vector.Element> it2 = vector.iterateNonZero();
-      while (it2.hasNext()) {
-        Vector.Element e = it2.next();
-        if (indexCounts.get(e.index()) > greatestSmallCount) {
-          e.set(0.0);
-        }
-      }
-    }
-    return vector;
-  }
 }

Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java (from r1151818, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java&r1=1151818&r2=1167027&rev=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsReducer.java Fri Sep  9 07:45:16 2011
@@ -15,30 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.mahout.cf.taste.hadoop.similarity.item;
+package org.apache.mahout.cf.taste.hadoop.preparation;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.DistributedRowMatrix;
 
 import java.io.IOException;
 
-public class ToItemVectorsReducer
-    extends Reducer<IntWritable, DistributedRowMatrix.MatrixEntryWritable,IntWritable, VectorWritable> {
-  
-  @Override
-  protected void reduce(IntWritable rowIndex, Iterable<DistributedRowMatrix.MatrixEntryWritable> values, Context ctx)
-    throws IOException, InterruptedException {
+public class ToItemVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
 
-    Vector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
-    for (DistributedRowMatrix.MatrixEntryWritable entry : values) {
-      vector.setQuick(entry.getCol(), entry.getVal());
-    }
-    VectorWritable vectorWritable = new VectorWritable(vector);
+  @Override
+  protected void reduce(IntWritable row, Iterable<VectorWritable> vectors, Context ctx)
+      throws IOException, InterruptedException {
+    VectorWritable vectorWritable = VectorWritable.merge(vectors.iterator());
     vectorWritable.setWritesLaxPrecision(true);
-    ctx.write(rowIndex, vectorWritable);
+    ctx.write(row, vectorWritable);
   }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java Fri Sep  9 07:45:16 2011
@@ -24,28 +24,16 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
-import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
-import org.apache.mahout.cf.taste.hadoop.MaybePruneRowsMapper;
-import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
-import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexMapper;
-import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexReducer;
-import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
-import org.apache.mahout.cf.taste.hadoop.item.ToUserVectorReducer;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob;
 import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.math.VarIntWritable;
-import org.apache.mahout.math.VarLongWritable;
-import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.DistributedRowMatrix;
-import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob;
-import org.apache.mahout.math.hadoop.similarity.SimilarityType;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasures;
 
 /**
  * <p>Distributed precomputation of the item-item-similarities for Itembased Collaborative Filtering</p>
@@ -67,8 +55,8 @@ import org.apache.mahout.math.hadoop.sim
  * <ol>
  * <li>-Dmapred.input.dir=(path): Directory containing one or more text files with the preference data</li>
  * <li>-Dmapred.output.dir=(path): output path where similarity data should be written</li>
- * <li>--similarityClassname (classname): Name of distributed similarity class to instantiate or a predefined similarity
- *  from {@link SimilarityType}</li>
+ * <li>--similarityClassname (classname): Name of distributed similarity measure class to instantiate or a predefined similarity
+ *  from {@link org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasure}</li>
  * <li>--maxSimilaritiesPerItem (integer): Maximum number of similarities considered per item (100)</li>
  * <li>--maxCooccurrencesPerItem (integer): Maximum number of cooccurrences considered per item (100)</li>
  * <li>--booleanData (boolean): Treat input data as having no pref values (false)</li>
@@ -84,7 +72,7 @@ public final class ItemSimilarityJob ext
   static final String MAX_SIMILARITIES_PER_ITEM = ItemSimilarityJob.class.getName() + ".maxSimilarItemsPerItem";
 
   private static final int DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM = 100;
-  private static final int DEFAULT_MAX_COOCCURRENCES_PER_ITEM = 100;
+  private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
   private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
 
   public static void main(String[] args) throws Exception {
@@ -96,17 +84,17 @@ public final class ItemSimilarityJob ext
 
     addInputOption();
     addOutputOption();
-    addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate, alternatively use "
-        + "one of the predefined similarities (" + SimilarityType.listEnumNames() + ')');
+    addOption("similarityClassname", "s", "Name of distributed similarity measures class to instantiate, " +
+        "alternatively use one of the predefined similarities (" + VectorSimilarityMeasures.list() + ')');
     addOption("maxSimilaritiesPerItem", "m", "try to cap the number of similar items per item to this number "
         + "(default: " + DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM + ')',
         String.valueOf(DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM));
-    addOption("maxCooccurrencesPerItem", "mo", "try to cap the number of cooccurrences per item to this number "
-        + "(default: " + DEFAULT_MAX_COOCCURRENCES_PER_ITEM + ')',
-        String.valueOf(DEFAULT_MAX_COOCCURRENCES_PER_ITEM));
+    addOption("maxPrefsPerUser", "mppu", "max number of preferences to consider per user, " +
+        "users with more preferences will be sampled down (default: " + DEFAULT_MAX_PREFS_PER_USER + ")",
+        String.valueOf(DEFAULT_MAX_PREFS_PER_USER));
     addOption("minPrefsPerUser", "mp", "ignore users with less preferences than this "
         + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
-    addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString());
+    addOption("booleanData", "b", "Treat input as without pref values", String.valueOf(Boolean.FALSE));
 
     Map<String,String> parsedArgs = parseArguments(args);
     if (parsedArgs == null) {
@@ -115,79 +103,38 @@ public final class ItemSimilarityJob ext
 
     String similarityClassName = parsedArgs.get("--similarityClassname");
     int maxSimilarItemsPerItem = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItem"));
-    int maxCooccurrencesPerItem = Integer.parseInt(parsedArgs.get("--maxCooccurrencesPerItem"));
+    int maxPrefsPerUser = Integer.parseInt(parsedArgs.get("--maxPrefsPerUser"));
     int minPrefsPerUser = Integer.parseInt(parsedArgs.get("--minPrefsPerUser"));
     boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
 
-    Path inputPath = getInputPath();
-    Path outputPath = getOutputPath();
-
-    Path itemIDIndexPath = getTempPath("itemIDIndex");
-    Path userVectorPath = getTempPath("userVectors");
-    Path itemUserMatrixPath = getTempPath("itemUserMatrix");
     Path similarityMatrixPath = getTempPath("similarityMatrix");
+    Path prepPath = getTempPath("prepareRatingMatrix");
 
     AtomicInteger currentPhase = new AtomicInteger();
 
-    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job itemIDIndex = prepareJob(
-        inputPath, itemIDIndexPath, TextInputFormat.class,
-        ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,
-        ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,
-        SequenceFileOutputFormat.class);
-      itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);
-      itemIDIndex.waitForCompletion(true);
-    }
-
-    int numberOfUsers = 0;
-
-    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job toUserVector = prepareJob(inputPath,
-                                  userVectorPath,
-                                  TextInputFormat.class,
-                                  ToItemPrefsMapper.class,
-                                  VarLongWritable.class,
-                                  booleanData ? VarLongWritable.class : EntityPrefWritable.class,
-                                  ToUserVectorReducer.class,
-                                  VarLongWritable.class,
-                                  VectorWritable.class,
-                                  SequenceFileOutputFormat.class);
-      toUserVector.getConfiguration().setBoolean(RecommenderJob.BOOLEAN_DATA, booleanData);
-      toUserVector.getConfiguration().setInt(ToUserVectorReducer.MIN_PREFERENCES_PER_USER, minPrefsPerUser);
-      toUserVector.waitForCompletion(true);
+    ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
+        "--input", getInputPath().toString(),
+        "--output", prepPath.toString(),
+        "--maxPrefsPerUser", String.valueOf(maxPrefsPerUser),
+        "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
+        "--booleanData", String.valueOf(booleanData),
+        "--tempDir", getTempPath().toString()});
 
-      numberOfUsers = (int) toUserVector.getCounters().findCounter(ToUserVectorReducer.Counters.USERS).getValue();
-    }
-
-    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job maybePruneAndTransponse = prepareJob(userVectorPath,
-                                  itemUserMatrixPath,
-                                  SequenceFileInputFormat.class,
-                                  MaybePruneRowsMapper.class,
-                                  IntWritable.class,
-                                  DistributedRowMatrix.MatrixEntryWritable.class,
-                                  ToItemVectorsReducer.class,
-                                  IntWritable.class,
-                                  VectorWritable.class,
-                                  SequenceFileOutputFormat.class);
-      maybePruneAndTransponse.getConfiguration().setInt(MaybePruneRowsMapper.MAX_COOCCURRENCES,
-          maxCooccurrencesPerItem);
-      maybePruneAndTransponse.waitForCompletion(true);
-    }
+    int numberOfUsers = TasteHadoopUtils.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());
 
     /* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like
      * new DistributedRowMatrix(...).rowSimilarity(...) */
     ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
-      "-Dmapred.input.dir=" + itemUserMatrixPath,
-      "-Dmapred.output.dir=" + similarityMatrixPath,
-      "--numberOfColumns", String.valueOf(numberOfUsers),
-      "--similarityClassname", similarityClassName,
-      "--maxSimilaritiesPerRow", String.valueOf(maxSimilarItemsPerItem + 1),
-      "--tempDir", getTempPath().toString() });
+        "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
+        "--output", similarityMatrixPath.toString(),
+        "--numberOfColumns", String.valueOf(numberOfUsers),
+        "--similarityClassname", similarityClassName,
+        "--maxSimilaritiesPerRow", String.valueOf(maxSimilarItemsPerItem + 1),
+        "--tempDir", getTempPath().toString() });
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
       Job mostSimilarItems = prepareJob(similarityMatrixPath,
-                                  outputPath,
+                                  getOutputPath(),
                                   SequenceFileInputFormat.class,
                                   MostSimilarItemPairsMapper.class,
                                   EntityEntityWritable.class,
@@ -197,7 +144,8 @@ public final class ItemSimilarityJob ext
                                   DoubleWritable.class,
                                   TextOutputFormat.class);
       Configuration mostSimilarItemsConf = mostSimilarItems.getConfiguration();
-      mostSimilarItemsConf.set(ITEM_ID_INDEX_PATH_STR, itemIDIndexPath.toString());
+      mostSimilarItemsConf.set(ITEM_ID_INDEX_PATH_STR,
+          new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
       mostSimilarItemsConf.setInt(MAX_SIMILARITIES_PER_ITEM, maxSimilarItemsPerItem);
       mostSimilarItems.setCombinerClass(MostSimilarItemPairsReducer.class);
       mostSimilarItems.waitForCompletion(true);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Fri Sep  9 07:45:16 2011
@@ -44,6 +44,8 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
 import org.slf4j.Logger;
@@ -123,6 +125,11 @@ public abstract class AbstractJob extend
     return outputPath;
   }
 
+  protected Path getOutputPath(String path) {
+    return new Path(outputPath, path);
+  }
+
+
   protected Path getTempPath() {
     return tempPath;
   }
@@ -399,6 +406,48 @@ public abstract class AbstractJob extend
                            Class<? extends Mapper> mapper,
                            Class<? extends Writable> mapperKey,
                            Class<? extends Writable> mapperValue,
+                           Class<? extends OutputFormat> outputFormat) throws IOException {
+
+    Job job = new Job(new Configuration(getConf()));
+    Configuration jobConf = job.getConfiguration();
+
+    if (mapper.equals(Mapper.class)) {
+        throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer");
+    }
+    job.setJarByClass(mapper);
+
+    job.setInputFormatClass(inputFormat);
+    jobConf.set("mapred.input.dir", inputPath.toString());
+
+    job.setMapperClass(mapper);
+    job.setMapOutputKeyClass(mapperKey);
+    job.setMapOutputValueClass(mapperValue);
+
+    jobConf.setBoolean("mapred.compress.map.output", true);
+    job.setNumReduceTasks(0);
+
+
+    job.setJobName(getCustomJobName(job, mapper, Reducer.class));
+
+    job.setOutputFormatClass(outputFormat);
+    jobConf.set("mapred.output.dir", outputPath.toString());
+
+    return job;
+  }
+
+  protected Job prepareJob(Path inputPath, Path outputPath, Class<? extends Mapper> mapper,
+      Class<? extends Writable> mapperKey, Class<? extends Writable> mapperValue, Class<? extends Reducer> reducer,
+      Class<? extends Writable> reducerKey, Class<? extends Writable> reducerValue) throws IOException {
+    return prepareJob(inputPath, outputPath, SequenceFileInputFormat.class, mapper, mapperKey, mapperValue, reducer,
+        reducerKey, reducerValue, SequenceFileOutputFormat.class);
+  }
+
+  protected Job prepareJob(Path inputPath,
+                           Path outputPath,
+                           Class<? extends InputFormat> inputFormat,
+                           Class<? extends Mapper> mapper,
+                           Class<? extends Writable> mapperKey,
+                           Class<? extends Writable> mapperValue,
                            Class<? extends Reducer> reducer,
                            Class<? extends Writable> reducerKey,
                            Class<? extends Writable> reducerValue,

Added: mahout/trunk/core/src/main/java/org/apache/mahout/common/ClassUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/ClassUtils.java?rev=1167027&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/ClassUtils.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/ClassUtils.java Fri Sep  9 07:45:16 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+public class ClassUtils {
+
+  private ClassUtils() {}
+
+  public static <T> T instantiateAs(String classname, Class<T> asSubclassOfClass) {
+    try {
+      return Class.forName(classname).asSubclass(asSubclassOfClass).newInstance();
+    } catch (ClassNotFoundException cnfe) {
+      throw new IllegalStateException(cnfe);
+    } catch (InstantiationException ie) {
+      throw new IllegalStateException(ie);
+    } catch (IllegalAccessException iae) {
+      throw new IllegalStateException(iae);
+    }
+  }
+
+  public static <T> T instantiateAs(Class<? extends T> clazz, Class<T> asSubclassOfClass) {
+    try {
+      return clazz.asSubclass(asSubclassOfClass).newInstance();
+    } catch (InstantiationException ie) {
+      throw new IllegalStateException(ie);
+    } catch (IllegalAccessException iae) {
+      throw new IllegalStateException(iae);
+    }
+  }
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/linkanalysis/PageRankJob.java Fri Sep  9 07:45:16 2011
@@ -106,6 +106,9 @@ public class PageRankJob extends Abstrac
     addOption("teleportationProbability", "tp", "probability to teleport to a random vertex", String.valueOf(0.8));
 
     Map<String, String> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
 
     Path vertexIndex = new Path(parsedArgs.get("--vertexIndex"));
     Path edges = new Path(parsedArgs.get("--edges"));

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java Fri Sep  9 07:45:16 2011
@@ -21,6 +21,9 @@ import org.apache.hadoop.conf.Configured
 import org.apache.hadoop.io.Writable;
 
 import com.google.common.base.Preconditions;
+import org.apache.mahout.math.map.OpenDoubleIntHashMap;
+import org.apache.mahout.math.map.OpenIntDoubleHashMap;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -125,7 +128,7 @@ public final class VectorWritable extend
   public static void writeVector(DataOutput out, Vector vector) throws IOException {
     writeVector(out, vector, false);
   }
-  
+
   public static void writeVector(DataOutput out, Vector vector, boolean laxPrecision) throws IOException {
     boolean dense = vector.isDense();
     boolean sequential = vector.isSequentialAccess();
@@ -186,6 +189,21 @@ public final class VectorWritable extend
     return v.get();
   }
 
+  public static VectorWritable merge(Iterator<VectorWritable> vectors) {
+    Vector accumulator = vectors.next().get();
+    while (vectors.hasNext()) {
+      VectorWritable v = vectors.next();
+      if (v != null) {
+        Iterator<Vector.Element> nonZeroElements = v.get().iterateNonZero();
+        while (nonZeroElements.hasNext()) {
+          Vector.Element nonZeroElement = nonZeroElements.next();
+          accumulator.setQuick(nonZeroElement.index(), nonZeroElement.get());
+        }
+      }
+    }
+    return new VectorWritable(accumulator);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (o instanceof VectorWritable) {

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java?rev=1167027&r1=1167026&r2=1167027&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java Fri Sep  9 07:45:16 2011
@@ -127,28 +127,13 @@ public class TransposeJob extends Abstra
     }
   }
 
-  static Vector merge(Iterator<VectorWritable> vectors) {
-    Vector accumulator = vectors.next().get();
-    while (vectors.hasNext()) {
-      VectorWritable v = vectors.next();
-      if (v != null) {
-        Iterator<Vector.Element> nonZeroElements = v.get().iterateNonZero();
-        while (nonZeroElements.hasNext()) {
-          Vector.Element nonZeroElement = nonZeroElements.next();
-          accumulator.setQuick(nonZeroElement.index(), nonZeroElement.get());
-        }
-      }
-    }
-    return accumulator;
-  }
-
   public static class MergeVectorsCombiner extends MapReduceBase
         implements Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
 
     @Override
     public void reduce(WritableComparable<?> key, Iterator<VectorWritable> vectors,
         OutputCollector<WritableComparable<?>, VectorWritable> out, Reporter reporter) throws IOException {
-      out.collect(key, new VectorWritable(merge(vectors)));
+      out.collect(key, VectorWritable.merge(vectors));
     }
   }
 
@@ -158,7 +143,8 @@ public class TransposeJob extends Abstra
     @Override
     public void reduce(WritableComparable<?> key, Iterator<VectorWritable> vectors,
         OutputCollector<WritableComparable<?>, VectorWritable> out, Reporter reporter) throws IOException {
-      out.collect(key, new VectorWritable(new SequentialAccessSparseVector(merge(vectors))));
+      Vector merged = VectorWritable.merge(vectors).get();
+      out.collect(key, new VectorWritable(new SequentialAccessSparseVector(merged)));
     }
   }
 }

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java?rev=1167027&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java Fri Sep  9 07:45:16 2011
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity.cooccurrence;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.common.TopK;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.mapreduce.VectorSumReducer;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasures;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasure;
+import org.apache.mahout.math.map.OpenIntDoubleHashMap;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RowSimilarityJob extends AbstractJob {
+
+  static final String SIMILARITY_CLASSNAME = RowSimilarityJob.class + ".distributedSimilarityClassname";
+  static final String NUMBER_OF_COLUMNS = RowSimilarityJob.class + ".numberOfColumns";
+  static final String MAX_SIMILARITIES_PER_ROW = RowSimilarityJob.class + ".maxSimilaritiesPerRow";
+  static final String EXCLUDE_SELF_SIMILARITY = RowSimilarityJob.class + ".excludeSelfSimilarity";
+  static final String THRESHOLD = RowSimilarityJob.class + ".threshold";
+
+  static final String NORMS_PATH = RowSimilarityJob.class + ".normsPath";
+  static final String MAXVALUES_PATH = RowSimilarityJob.class + ".maxWeightsPath";
+  static final String NUM_NON_ZERO_ENTRIES_PATH = RowSimilarityJob.class + ".nonZeroEntriesPath";
+
+  private static final int DEFAULT_MAX_SIMILARITIES_PER_ROW = 100;
+  private static final double NO_THRESHOLD = Double.MIN_VALUE;
+
+  private static final int NORM_VECTOR_MARKER = Integer.MIN_VALUE;
+  private static final int MAXVALUE_VECTOR_MARKER = Integer.MIN_VALUE + 1;
+  private static final int NUM_NON_ZERO_ENTRIES_VECTOR_MARKER = Integer.MIN_VALUE + 2;
+
+  static enum Counters { ROWS, COOCCURRENCES, PRUNED_COOCCURRENCES }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new RowSimilarityJob(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    addInputOption();
+    addOutputOption();
+    addOption("numberOfColumns", "r", "Number of columns in the input matrix");
+    addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate, alternatively use "
+        + "one of the predefined similarities (" + VectorSimilarityMeasures.list() + ')');
+    addOption("maxSimilaritiesPerRow", "m", "Number of maximum similarities per row (default: "
+        + DEFAULT_MAX_SIMILARITIES_PER_ROW + ')', String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ROW));
+    addOption("excludeSelfSimilarity", "ess", "compute similarity of rows to themselves?", String.valueOf(false));
+    addOption("threshold", "tr", "drop row pairs with a similarity value below this");
+
+    Map<String,String> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+
+    int numberOfColumns = Integer.parseInt(parsedArgs.get("--numberOfColumns"));
+    String similarityClassnameArg = parsedArgs.get("--similarityClassname");
+    String similarityClassname;
+    try {
+      similarityClassname = VectorSimilarityMeasures.valueOf(similarityClassnameArg).getClassname();
+    } catch (IllegalArgumentException iae) {
+      similarityClassname = similarityClassnameArg;
+    }
+
+    int maxSimilaritiesPerRow = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerRow"));
+    boolean excludeSelfSimilarity = Boolean.parseBoolean(parsedArgs.get("--excludeSelfSimilarity"));
+    double threshold = parsedArgs.containsKey("--threshold") ?
+        Double.parseDouble(parsedArgs.get("--threshold")) : NO_THRESHOLD;
+
+    Path weightsPath = getTempPath("weights");
+    Path normsPath = getTempPath("norms.bin");
+    Path numNonZeroEntriesPath = getTempPath("numNonZeroEntries.bin");
+    Path maxValuesPath = getTempPath("maxValues.bin");
+    Path pairwiseSimilarityPath = getTempPath("pairwiseSimilarity");
+
+    AtomicInteger currentPhase = new AtomicInteger();
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,
+          VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);
+      normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);
+      Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();
+      normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));
+      normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());
+      normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
+      normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());
+      normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);
+      normsAndTranspose.waitForCompletion(true);
+    }
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,
+          IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);
+      pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);
+      Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();
+      pairwiseConf.set(THRESHOLD, String.valueOf(threshold));
+      pairwiseConf.set(NORMS_PATH, normsPath.toString());
+      pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
+      pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());
+      pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);
+      pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);
+      pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);
+      pairwiseSimilarity.waitForCompletion(true);
+    }
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,
+          IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,
+          VectorWritable.class);
+      asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);
+      asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);
+      asMatrix.waitForCompletion(true);
+    }
+
+    return 0;
+  }
+
+  public static class VectorNormMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+    private VectorSimilarityMeasure similarity;
+    private Vector norms;
+    private Vector nonZeroEntries;
+    private Vector maxValues;
+    private double threshold;
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      similarity = ClassUtils.instantiateAs(ctx.getConfiguration().get(SIMILARITY_CLASSNAME),
+          VectorSimilarityMeasure.class);
+      norms = new RandomAccessSparseVector(Integer.MAX_VALUE);
+      nonZeroEntries = new RandomAccessSparseVector(Integer.MAX_VALUE);
+      maxValues = new RandomAccessSparseVector(Integer.MAX_VALUE);
+      threshold = Double.parseDouble(ctx.getConfiguration().get(THRESHOLD));
+    }
+
+    @Override
+    protected void map(IntWritable row, VectorWritable vectorWritable, Context ctx)
+        throws IOException, InterruptedException {
+
+      Vector rowVector = similarity.normalize(vectorWritable.get());
+
+      int numNonZeroEntries = 0;
+      double maxValue = Double.MIN_VALUE;
+
+      Iterator<Vector.Element> nonZeroElements = rowVector.iterateNonZero();
+      while (nonZeroElements.hasNext()) {
+        Vector.Element element = nonZeroElements.next();
+        RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE);
+        partialColumnVector.setQuick(row.get(), element.get());
+        ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector));
+
+        numNonZeroEntries++;
+        if (maxValue < element.get()) {
+          maxValue = element.get();
+        }
+      }
+
+      if (threshold != NO_THRESHOLD) {
+        nonZeroEntries.setQuick(row.get(), numNonZeroEntries);
+        maxValues.setQuick(row.get(), maxValue);
+      }
+      norms.setQuick(row.get(), similarity.norm(rowVector));
+
+      ctx.getCounter(Counters.ROWS).increment(1);
+    }
+
+    @Override
+    protected void cleanup(Context ctx) throws IOException, InterruptedException {
+      super.cleanup(ctx);
+      // dirty trick
+      ctx.write(new IntWritable(NORM_VECTOR_MARKER), new VectorWritable(norms));
+      ctx.write(new IntWritable(NUM_NON_ZERO_ENTRIES_VECTOR_MARKER), new VectorWritable(nonZeroEntries));
+      ctx.write(new IntWritable(MAXVALUE_VECTOR_MARKER), new VectorWritable(maxValues));
+    }
+  }
+
+  public static class MergeVectorsCombiner extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+    @Override
+    protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
+        throws IOException, InterruptedException {
+      ctx.write(row, new VectorWritable(Vectors.merge(partialVectors)));
+    }
+  }
+
+  public static class MergeVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+    private Path normsPath;
+    private Path numNonZeroEntriesPath;
+    private Path maxValuesPath;
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      normsPath = new Path(ctx.getConfiguration().get(NORMS_PATH));
+      numNonZeroEntriesPath = new Path(ctx.getConfiguration().get(NUM_NON_ZERO_ENTRIES_PATH));
+      maxValuesPath = new Path(ctx.getConfiguration().get(MAXVALUES_PATH));
+    }
+
+    @Override
+    protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
+        throws IOException, InterruptedException {
+      Vector partialVector = Vectors.merge(partialVectors);
+
+      if (row.get() == NORM_VECTOR_MARKER) {
+        Vectors.write(partialVector, normsPath, ctx.getConfiguration());
+      } else if (row.get() == MAXVALUE_VECTOR_MARKER) {
+        Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration());
+      } else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) {
+        Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true);
+      } else {
+        ctx.write(row, new VectorWritable(partialVector));
+      }
+    }
+  }
+
+
+  public static class CooccurrencesMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+    private VectorSimilarityMeasure similarity;
+
+    private OpenIntIntHashMap numNonZeroEntries;
+    private Vector maxValues;
+    private double threshold;
+
+    private static final Comparator<Vector.Element> BY_INDEX = new Comparator<Vector.Element>() {
+      @Override
+      public int compare(Vector.Element one, Vector.Element two) {
+        return Ints.compare(one.index(), two.index());
+      }
+    };
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      similarity = ClassUtils.instantiateAs(ctx.getConfiguration().get(SIMILARITY_CLASSNAME),
+          VectorSimilarityMeasure.class);
+      numNonZeroEntries = Vectors.readAsIntMap(new Path(ctx.getConfiguration().get(NUM_NON_ZERO_ENTRIES_PATH)),
+          ctx.getConfiguration());
+      maxValues = Vectors.read(new Path(ctx.getConfiguration().get(MAXVALUES_PATH)), ctx.getConfiguration());
+      threshold = Double.parseDouble(ctx.getConfiguration().get(THRESHOLD));
+    }
+
+    private boolean consider(Vector.Element occurrenceA, Vector.Element occurrenceB) {
+      int numNonZeroEntriesA = numNonZeroEntries.get(occurrenceA.index());
+      int numNonZeroEntriesB = numNonZeroEntries.get(occurrenceB.index());
+
+      double maxValueA = maxValues.get(occurrenceA.index());
+
+      return similarity.consider(numNonZeroEntriesA, numNonZeroEntriesB, maxValueA, threshold);
+    }
+
+    @Override
+    protected void map(IntWritable column, VectorWritable occurrenceVector, Context ctx)
+        throws IOException, InterruptedException {
+      Vector.Element[] occurrences = Vectors.toArray(occurrenceVector);
+      Arrays.sort(occurrences, BY_INDEX);
+
+      int cooccurrences = 0;
+      int prunedCooccurrences = 0;
+      for (int n = 0; n < occurrences.length; n++) {
+        Vector.Element occurrenceA = occurrences[n];
+        Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE);
+        for (int m = n; m < occurrences.length; m++) {
+          Vector.Element occurrenceB = occurrences[m];
+          if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) {
+            dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get()));
+            cooccurrences++;
+          } else {
+            prunedCooccurrences++;
+          }
+        }
+        ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots));
+      }
+      ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences);
+      ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences);
+    }
+  }
+
+
+  public static class SimilarityReducer
+      extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+    private VectorSimilarityMeasure similarity;
+    private int numberOfColumns;
+    private boolean excludeSelfSimilarity;
+    private Vector norms;
+    private double treshold;
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      similarity = ClassUtils.instantiateAs(ctx.getConfiguration().get(SIMILARITY_CLASSNAME),
+          VectorSimilarityMeasure.class);
+      numberOfColumns = ctx.getConfiguration().getInt(NUMBER_OF_COLUMNS, -1);
+      Preconditions.checkArgument(numberOfColumns > 0, "Incorrect number of columns!");
+      excludeSelfSimilarity = ctx.getConfiguration().getBoolean(EXCLUDE_SELF_SIMILARITY, false);
+      norms = Vectors.read(new Path(ctx.getConfiguration().get(NORMS_PATH)), ctx.getConfiguration());
+      treshold = Double.parseDouble(ctx.getConfiguration().get(THRESHOLD));
+    }
+
+    @Override
+    protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx)
+        throws IOException, InterruptedException {
+      Iterator<VectorWritable> partialDotsIterator = partialDots.iterator();
+      Vector dots = partialDotsIterator.next().get();
+      while (partialDotsIterator.hasNext()) {
+        Vector toAdd = partialDotsIterator.next().get();
+        Iterator<Vector.Element> nonZeroElements = toAdd.iterateNonZero();
+        while (nonZeroElements.hasNext()) {
+          Vector.Element nonZeroElement = nonZeroElements.next();
+          dots.setQuick(nonZeroElement.index(), dots.getQuick(nonZeroElement.index()) + nonZeroElement.get());
+        }
+      }
+
+      Vector similarities = dots.like();
+      double normA = norms.getQuick(row.get());
+      Iterator<Vector.Element> dotsWith = dots.iterateNonZero();
+      while (dotsWith.hasNext()) {
+        Vector.Element b = dotsWith.next();
+        double similarityValue = similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns);
+        if (similarityValue >= treshold) {
+          similarities.set(b.index(), similarityValue);
+        }
+      }
+      if (excludeSelfSimilarity) {
+        similarities.setQuick(row.get(), 0);
+      }
+      ctx.write(row, new VectorWritable(similarities));
+    }
+  }
+
+  public static class UnsymmetrifyMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable>  {
+
+    private int maxSimilaritiesPerRow;
+
+    @Override
+    protected void setup(Mapper.Context ctx) throws IOException, InterruptedException {
+      maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);
+      Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");
+    }
+
+    @Override
+    protected void map(IntWritable row, VectorWritable similaritiesWritable, Context ctx)
+        throws IOException, InterruptedException {
+      Vector similarities = similaritiesWritable.get();
+      TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(maxSimilaritiesPerRow, Vectors.BY_VALUE);
+      Iterator<Vector.Element> nonZeroElements = similarities.iterateNonZero();
+      while (nonZeroElements.hasNext()) {
+        Vector.Element nonZeroElement = nonZeroElements.next();
+        topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));
+        Vector transposedPartial = similarities.like();
+        transposedPartial.setQuick(row.get(), nonZeroElement.get());
+        ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));
+      }
+      Vector topKSimilarities = similarities.like();
+      for (Vector.Element topKSimilarity : topKQueue.retrieve()) {
+        topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());
+      }
+      ctx.write(row, new VectorWritable(topKSimilarities));
+    }
+  }
+
+  public static class MergeToTopKSimilaritiesReducer
+      extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+    private int maxSimilaritiesPerRow;
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);
+      Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");
+    }
+
+    @Override
+    protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx)
+        throws IOException, InterruptedException {
+      Vector allSimilarities = Vectors.merge(partials);
+      Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities);
+      ctx.write(row, new VectorWritable(topKSimilarities));
+    }
+  }
+
+}