You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2010/08/18 08:09:39 UTC

svn commit: r986573 - in /mahout/trunk/core/src: main/java/org/apache/mahout/cf/taste/hadoop/ main/java/org/apache/mahout/cf/taste/hadoop/item/ main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ main/java/org/apache/mahout/math/hadoop/similar...

Author: ssc
Date: Wed Aug 18 06:09:38 2010
New Revision: 986573

URL: http://svn.apache.org/viewvc?rev=986573&view=rev
Log:
MAHOUT-460 Add maxPreferencesPerItemConsidered option to o.a.m.cf.taste.hadoop.similarity.item.ItemSimilarityJob

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapperTest.java
Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/MaybePruneRowsMapper.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java?rev=986573&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapper.java Wed Aug 18 06:09:38 2010
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+
+/**
+ * tries to limit the number of elements per col to a fixed size and transposes the input afterwards
+ */
+public class MaybePruneRowsMapper
+    extends Mapper<VarLongWritable,VectorWritable,IntWritable,DistributedRowMatrix.MatrixEntryWritable> {
+
+  public static final String MAX_COOCCURRENCES = MaybePruneRowsMapper.class.getName() + ".maxCooccurrences";
+  
+  private int maxCooccurrences;
+  private final OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
+
+  @Override
+  protected void setup(Context ctx) throws IOException, InterruptedException {
+    super.setup(ctx);
+    maxCooccurrences = ctx.getConfiguration().getInt(MAX_COOCCURRENCES, -1);
+    if (maxCooccurrences < 1) {
+      throw new IllegalStateException("Maximum number of cooccurrences was not correctly set!");
+    }
+  }
+
+  @Override
+  protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, Context ctx)
+      throws IOException, InterruptedException {
+    Vector vector = vectorWritable.get();
+    countSeen(vector);
+    vector = maybePruneVector(vector);
+
+    DistributedRowMatrix.MatrixEntryWritable entry = new DistributedRowMatrix.MatrixEntryWritable();
+    int colIndex = TasteHadoopUtils.idToIndex(rowIndex.get());
+    entry.setCol(colIndex);
+    Iterator<Vector.Element> iterator = vector.iterateNonZero();
+    while (iterator.hasNext()) {
+      Vector.Element elem = iterator.next();
+      entry.setRow(elem.index());
+      entry.setVal(elem.get());
+      ctx.write(new IntWritable(elem.index()), entry);
+    }
+  }
+
+  private void countSeen(Vector vector) {
+    Iterator<Vector.Element> it = vector.iterateNonZero();
+    while (it.hasNext()) {
+      int index = it.next().index();
+      indexCounts.adjustOrPutValue(index, 1, 1);
+    }
+  }
+
+  private Vector maybePruneVector(Vector vector) {
+    if (vector.getNumNondefaultElements() <= maxCooccurrences) {
+      return vector;
+    }
+
+    PriorityQueue<Integer> smallCounts = new PriorityQueue<Integer>(maxCooccurrences + 1, Collections.reverseOrder());
+    Iterator<Vector.Element> it = vector.iterateNonZero();
+    while (it.hasNext()) {
+      int count = indexCounts.get(it.next().index());
+      if (smallCounts.size() < maxCooccurrences) {
+        smallCounts.add(count);
+      } else if (count < smallCounts.peek()) {
+        smallCounts.add(count);
+        smallCounts.poll();
+      }
+    }
+
+    int greatestSmallCount = smallCounts.peek();
+    if (greatestSmallCount > 0) {
+      Iterator<Vector.Element> it2 = vector.iterateNonZero();
+      while (it2.hasNext()) {
+        Vector.Element e = it2.next();
+        if (indexCounts.get(e.index()) > greatestSmallCount) {
+          e.set(0.0);
+        }
+      }
+    }
+    return vector;
+  }
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=986573&r1=986572&r2=986573&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Wed Aug 18 06:09:38 2010
@@ -32,14 +32,14 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.MaybePruneRowsMapper;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
 import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
 import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
 import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersKeyWritable;
 import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersMapper;
 import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersReducer;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.PrefsToItemUserMatrixMapper;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.PrefsToItemUserMatrixReducer;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.ToItemVectorsReducer;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
@@ -68,9 +68,11 @@ import java.util.regex.Pattern;
  * <li>--itemsFile (path): file containing item IDs to recommend for (optional)</li>
  * <li>--numRecommendations (integer): Number of recommendations to compute per user (optional; default 10)</li>
  * <li>--booleanData (boolean): Treat input data as having to pref values (false)</li>
- * <li>--maxPrefsPerUserConsidered (integer): Maximum number of preferences considered per user in
+ * <li>--maxPrefsPerUser(integer): Maximum number of preferences considered per user in
  *  final recommendation phase (10)</li>
- * <li>--maxSimilaritiesPerItemConsidered (integer): Maximum number of similarities considered per item (optional;
+ * <li>--maxSimilaritiesPerItem (integer): Maximum number of similarities considered per item (optional;
+ *  default 100)</li>
+ * <li>--maxCooccurrencesPerItem (integer): Maximum number of cooccurrences considered per item (optional;
  *  default 100)</li>
  * </ol>
  *
@@ -81,11 +83,10 @@ import java.util.regex.Pattern;
  */
 public final class RecommenderJob extends AbstractJob {
 
-  static final String MAX_SIMILARITIES_PER_ITEM_CONSIDERED = RecommenderJob.class.getName() +
-      ".maxSimilaritiesPerItemConsidered";
-
   public static final String BOOLEAN_DATA = "booleanData";
-  public static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM_CONSIDERED = 100;
+  
+  private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;
+  private static final int DEFAULT_MAX_COOCCURRENCES_PER_ITEM = 100;
 
   @Override
   public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
@@ -95,13 +96,16 @@ public final class RecommenderJob extend
     addOption("numRecommendations", "n", "Number of recommendations per user",
         String.valueOf(AggregateAndRecommendReducer.DEFAULT_NUM_RECOMMENDATIONS));
     addOption("usersFile", "u", "File of users to recommend for", null);
-    addOption("itemsFile", "u", "File of items to recommend for", null);
+    addOption("itemsFile", "i", "File of items to recommend for", null);
     addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString());
-    addOption("maxPrefsPerUserConsidered", null,
+    addOption("maxPrefsPerUser", null,
         "Maximum number of preferences considered per user in final recommendation phase",
         String.valueOf(UserVectorSplitterMapper.DEFAULT_MAX_PREFS_PER_USER_CONSIDERED));
-    addOption("maxSimilaritiesPerItemConsidered", null, "Maximum number of similarities considered per item ",
-        String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ITEM_CONSIDERED));    
+    addOption("maxSimilaritiesPerItem", null, "Maximum number of similarities considered per item ",
+        String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ITEM));
+    addOption("maxCooccurrencesPerItem", "o", "try to cap the number of cooccurrences per item to this " +
+        "number (default: " + DEFAULT_MAX_COOCCURRENCES_PER_ITEM + ')',
+        String.valueOf(DEFAULT_MAX_COOCCURRENCES_PER_ITEM));
     addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate, alternatively use " +
         "one of the predefined similarities (" + SimilarityType.listEnumNames() + ')',
         String.valueOf(SimilarityType.SIMILARITY_COOCCURRENCE));    
@@ -118,15 +122,15 @@ public final class RecommenderJob extend
     String usersFile = parsedArgs.get("--usersFile");
     String itemsFile = parsedArgs.get("--itemsFile");
     boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
-    int maxPrefsPerUserConsidered = Integer.parseInt(parsedArgs.get("--maxPrefsPerUserConsidered"));
-    int maxSimilaritiesPerItemConsidered = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItemConsidered"));
+    int maxPrefsPerUser = Integer.parseInt(parsedArgs.get("--maxPrefsPerUser"));
+    int maxSimilaritiesPerItem = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItem"));
+    int maxCooccurrencesPerItem = Integer.parseInt(parsedArgs.get("--maxCooccurrencesPerItem"));
     String similarityClassname = parsedArgs.get("--similarityClassname");
 
     Path userVectorPath = new Path(tempDirPath, "userVectors");
     Path itemIDIndexPath = new Path(tempDirPath, "itemIDIndex");
     Path countUsersPath = new Path(tempDirPath, "countUsers");
     Path itemUserMatrixPath = new Path(tempDirPath, "itemUserMatrix");
-    Path maybePruneItemUserMatrixPath = new Path(tempDirPath, "maybePruneItemUserMatrixPath");
     Path similarityMatrixPath = new Path(tempDirPath, "similarityMatrix");
     Path prePartialMultiplyPath1 = new Path(tempDirPath, "prePartialMultiply1");
     Path prePartialMultiplyPath2 = new Path(tempDirPath, "prePartialMultiply2");
@@ -171,34 +175,19 @@ public final class RecommenderJob extend
     }
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job itemUserMatrix = prepareJob(inputPath,
+      Job maybePruneAndTransponse = prepareJob(userVectorPath,
                                   itemUserMatrixPath,
-                                  TextInputFormat.class,
-                                  PrefsToItemUserMatrixMapper.class,
-                                  VarIntWritable.class,
-                                  DistributedRowMatrix.MatrixEntryWritable.class,
-                                  PrefsToItemUserMatrixReducer.class,
-                                  IntWritable.class,
-                                  VectorWritable.class,
-                                  SequenceFileOutputFormat.class);
-      itemUserMatrix.getConfiguration().setBoolean(PrefsToItemUserMatrixMapper.BOOLEAN_DATA, booleanData);
-      itemUserMatrix.waitForCompletion(true);
-    }
-
-    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job maybePruneItemUserMatrix = prepareJob(itemUserMatrixPath,
-                                  maybePruneItemUserMatrixPath,
                                   SequenceFileInputFormat.class,
                                   MaybePruneRowsMapper.class,
                                   IntWritable.class,
-                                  VectorWritable.class,
-                                  Reducer.class,
+                                  DistributedRowMatrix.MatrixEntryWritable.class,
+                                  ToItemVectorsReducer.class,
                                   IntWritable.class,
                                   VectorWritable.class,
                                   SequenceFileOutputFormat.class);
-      maybePruneItemUserMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ITEM_CONSIDERED,
-          maxSimilaritiesPerItemConsidered);
-      maybePruneItemUserMatrix.waitForCompletion(true);
+      maybePruneAndTransponse.getConfiguration().setInt(MaybePruneRowsMapper.MAX_COOCCURRENCES,
+          maxCooccurrencesPerItem);
+      maybePruneAndTransponse.waitForCompletion(true);
     }
 
     int numberOfUsers = TasteHadoopUtils.readIntFromFile(getConf(), countUsersPath);
@@ -207,10 +196,10 @@ public final class RecommenderJob extend
       /* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like
        * new DistributedRowMatrix(...).rowSimilarity(...) */
       try {
-        RowSimilarityJob.main(new String[] { "-Dmapred.input.dir=" + maybePruneItemUserMatrixPath.toString(),
+        RowSimilarityJob.main(new String[] { "-Dmapred.input.dir=" + itemUserMatrixPath.toString(),
             "-Dmapred.output.dir=" + similarityMatrixPath.toString(), "--numberOfColumns",
             String.valueOf(numberOfUsers), "--similarityClassname", similarityClassname, "--maxSimilaritiesPerRow",
-            String.valueOf(maxSimilaritiesPerItemConsidered + 1), "--tempDir", tempDirPath.toString() });
+            String.valueOf(maxSimilaritiesPerItem + 1), "--tempDir", tempDirPath.toString() });
       } catch (Exception e) {
         throw new IllegalStateException("item-item-similarity computation failed", e);
       }
@@ -233,7 +222,7 @@ public final class RecommenderJob extend
         prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
       }
       prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,
-                                                    maxPrefsPerUserConsidered);
+                                                    maxPrefsPerUser);
       prePartialMultiply2.waitForCompletion(true);
 
       Job partialMultiply = prepareJob(

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java?rev=986573&r1=986572&r2=986573&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java Wed Aug 18 06:09:38 2010
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -32,9 +33,14 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
+import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.MaybePruneRowsMapper;
 import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
 import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexMapper;
 import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexReducer;
+import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
+import org.apache.mahout.cf.taste.hadoop.item.ToUserVectorReducer;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
@@ -43,12 +49,33 @@ import org.apache.mahout.math.hadoop.Dis
 import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob;
 import org.apache.mahout.math.hadoop.similarity.SimilarityType;
 
+/**
+ * <p>Distributed precomputation of the item-item-similarities for Itembased Collaborative Filtering</p>
+ *
+ * <p>Command line arguments specific to this class are:</p>
+ *
+ * <ol>
+ * <li>-Dmapred.input.dir=(path): Directory containing a text file containing user IDs
+ *  for which recommendations should be computed, one per line</li>
+ * <li>-Dmapred.output.dir=(path): output path where recommender output should go</li>
+ * <li>--similarityClassname (classname): Name of distributed similarity class to instantiate</li>
+ * <li>--maxSimilaritiesPerItem (integer): Maximum number of similarities considered per item (optional;
+ *  default 100)</li>
+ * <li>--maxCooccurrencesPerItem (integer): Maximum number of cooccurrences considered per item (optional;
+ *  default 100)</li>
+ * </ol>
+ *
+ * <p>General command line options are documented in {@link AbstractJob}.</p>
+ *
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments must appear before all other arguments.</p>
+ */
 public final class ItemSimilarityJob extends AbstractJob {
 
-  static final String ITEM_ID_INDEX_PATH_STR = ItemSimilarityJob.class.getName() + "itemIDIndexPathStr";
-  static final String MAX_SIMILARITIES_PER_ITEM = ItemSimilarityJob.class.getName() + "maxSimilarItemsPerItem";
+  static final String ITEM_ID_INDEX_PATH_STR = ItemSimilarityJob.class.getName() + ".itemIDIndexPathStr";
+  static final String MAX_SIMILARITIES_PER_ITEM = ItemSimilarityJob.class.getName() + ".maxSimilarItemsPerItem";
 
   private static final int DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM = 100;
+  private static final int DEFAULT_MAX_COOCCURRENCES_PER_ITEM = 100;
 
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new ItemSimilarityJob(), args);
@@ -63,6 +90,8 @@ public final class ItemSimilarityJob ext
         "one of the predefined similarities (" + SimilarityType.listEnumNames() + ')');
     addOption("maxSimilaritiesPerItem", "m", "try to cap the number of similar items per item to this number " +
         "(default: " + DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM + ')', String.valueOf(DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM));
+    addOption("maxCooccurrencesPerItem", "o", "try to cap the number of cooccurrences per item to this number " +
+        "(default: " + DEFAULT_MAX_COOCCURRENCES_PER_ITEM + ')', String.valueOf(DEFAULT_MAX_COOCCURRENCES_PER_ITEM));
     addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString());
 
     Map<String,String> parsedArgs = parseArguments(args);
@@ -72,6 +101,7 @@ public final class ItemSimilarityJob ext
 
     String similarityClassName = parsedArgs.get("--similarityClassname");
     int maxSimilarItemsPerItem = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItem"));
+    int maxCooccurrencesPerItem = Integer.parseInt(parsedArgs.get("--maxCooccurrencesPerItem"));
     boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
 
     Path inputPath = getInputPath();
@@ -80,6 +110,7 @@ public final class ItemSimilarityJob ext
 
     Path itemIDIndexPath = new Path(tempDirPath, "itemIDIndex");
     Path countUsersPath = new Path(tempDirPath, "countUsers");
+    Path userVectorPath = new Path(tempDirPath, "userVectors");
     Path itemUserMatrixPath = new Path(tempDirPath, "itemUserMatrix");
     Path similarityMatrixPath = new Path(tempDirPath, "similarityMatrix");
 
@@ -112,18 +143,34 @@ public final class ItemSimilarityJob ext
     }
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job itemUserMatrix = prepareJob(inputPath,
-                                  itemUserMatrixPath,
+      Job toUserVector = prepareJob(inputPath,
+                                  userVectorPath,
                                   TextInputFormat.class,
-                                  PrefsToItemUserMatrixMapper.class,
-                                  VarIntWritable.class,
+                                  ToItemPrefsMapper.class,
+                                  VarLongWritable.class,
+                                  booleanData ? VarLongWritable.class : EntityPrefWritable.class,
+                                  ToUserVectorReducer.class,
+                                  VarLongWritable.class,
+                                  VectorWritable.class,
+                                  SequenceFileOutputFormat.class);
+      toUserVector.getConfiguration().setBoolean(RecommenderJob.BOOLEAN_DATA, booleanData);
+      toUserVector.waitForCompletion(true);
+    }
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job maybePruneAndTransponse = prepareJob(userVectorPath,
+                                  itemUserMatrixPath,
+                                  SequenceFileInputFormat.class,
+                                  MaybePruneRowsMapper.class,
+                                  IntWritable.class,
                                   DistributedRowMatrix.MatrixEntryWritable.class,
-                                  PrefsToItemUserMatrixReducer.class,
+                                  ToItemVectorsReducer.class,
                                   IntWritable.class,
                                   VectorWritable.class,
                                   SequenceFileOutputFormat.class);
-      itemUserMatrix.getConfiguration().setBoolean(PrefsToItemUserMatrixMapper.BOOLEAN_DATA, booleanData);
-      itemUserMatrix.waitForCompletion(true);
+      maybePruneAndTransponse.getConfiguration().setInt(MaybePruneRowsMapper.MAX_COOCCURRENCES,
+          maxCooccurrencesPerItem);
+      maybePruneAndTransponse.waitForCompletion(true);
     }
 
     int numberOfUsers = TasteHadoopUtils.readIntFromFile(getConf(), countUsersPath);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixMapper.java?rev=986573&r1=986572&r2=986573&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixMapper.java Wed Aug 18 06:09:38 2010
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.cf.taste.hadoop.similarity.item;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
-import org.apache.mahout.math.VarIntWritable;
-import org.apache.mahout.math.hadoop.DistributedRowMatrix;
-
-/**
- * creates an item-user-matrix entry from a preference, replacing userID and itemID with int indices
- */
-public class PrefsToItemUserMatrixMapper
-    extends Mapper<LongWritable,Text,VarIntWritable,DistributedRowMatrix.MatrixEntryWritable> {
-
-  public static final String BOOLEAN_DATA = PrefsToItemUserMatrixMapper.class.getName() + ".booleanData";
-
-  private boolean booleanData;
-  
-  @Override
-  protected void setup(Context ctx) throws IOException, InterruptedException {
-    booleanData = ctx.getConfiguration().getBoolean(BOOLEAN_DATA, false);
-  }
-
-  @Override
-  protected void map(LongWritable key, Text value, Context ctx)
-      throws IOException, InterruptedException {
-
-    String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
-    long userID = Long.parseLong(tokens[0]);
-    long itemID = Long.parseLong(tokens[1]);
-
-    boolean treatAsBoolean = booleanData || tokens.length < 3;
-    float prefValue = treatAsBoolean ? 1.0f : Float.parseFloat(tokens[2]);
-
-    int row = TasteHadoopUtils.idToIndex(itemID);
-    int column = TasteHadoopUtils.idToIndex(userID);
-
-    DistributedRowMatrix.MatrixEntryWritable entry = new DistributedRowMatrix.MatrixEntryWritable();
-    entry.setRow(row);
-    entry.setCol(column);
-    entry.setVal(prefValue);
-
-    ctx.write(new VarIntWritable(row), entry);
-  }
-
-}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java?rev=986573&r1=986572&r2=986573&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java Wed Aug 18 06:09:38 2010
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.cf.taste.hadoop.similarity.item;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.VarIntWritable;
-import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
-
-/**
- * creates matrix rows ({@link VectorWritable}s) from the {@link MatrixEntryWritable}s
- */
-public class PrefsToItemUserMatrixReducer
-    extends Reducer<VarIntWritable, MatrixEntryWritable,IntWritable,VectorWritable> {
-
-  @Override
-  protected void reduce(VarIntWritable rowIndex, Iterable<MatrixEntryWritable> entries,
-      Context ctx) throws IOException, InterruptedException {
-    Vector row = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
-    for (MatrixEntryWritable entry : entries) {
-      row.setQuick(entry.getCol(), entry.getVal());
-    }
-    ctx.write(new IntWritable(rowIndex.get()), new VectorWritable(row));
-  }
-}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java?rev=986573&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorsReducer.java Wed Aug 18 06:09:38 2010
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity.item;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+import java.io.IOException;
+
+public class ToItemVectorsReducer
+    extends Reducer<IntWritable, DistributedRowMatrix.MatrixEntryWritable,IntWritable, VectorWritable> {
+  
+  @Override
+  protected void reduce(IntWritable rowIndex, Iterable<DistributedRowMatrix.MatrixEntryWritable> values, Context ctx)
+      throws IOException, InterruptedException {
+
+    Vector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    for (DistributedRowMatrix.MatrixEntryWritable entry : values) {
+      vector.setQuick(entry.getCol(), entry.getVal());
+    }
+    VectorWritable vectorWritable = new VectorWritable(vector);
+    vectorWritable.setWritesLaxPrecision(true);
+    ctx.write(rowIndex, vectorWritable);
+  }
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java?rev=986573&r1=986572&r2=986573&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java Wed Aug 18 06:09:38 2010
@@ -76,8 +76,8 @@ public class RowSimilarityJob extends Ab
 
   public static final String DISTRIBUTED_SIMILARITY_CLASSNAME =
       RowSimilarityJob.class.getName() + ".distributedSimilarityClassname";
-  public static final String NUMBER_OF_COLUMNS = RowSimilarityJob.class.getName() + "numberOfRows";
-  public static final String MAX_SIMILARITIES_PER_ROW = RowSimilarityJob.class.getName() + "maxSimilaritiesPerRow";
+  public static final String NUMBER_OF_COLUMNS = RowSimilarityJob.class.getName() + ".numberOfRows";
+  public static final String MAX_SIMILARITIES_PER_ROW = RowSimilarityJob.class.getName() + ".maxSimilaritiesPerRow";
 
   private static final int DEFAULT_MAX_SIMILARITIES_PER_ROW = 100;
 

Added: mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapperTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapperTest.java?rev=986573&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapperTest.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/MaybePruneRowsMapperTest.java Wed Aug 18 06:09:38 2010
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.cf.taste.impl.TasteTestCase;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.MathHelper;
+import org.easymock.classextension.EasyMock;
+
+public class MaybePruneRowsMapperTest extends TasteTestCase {
+
+  public void testPruning() throws Exception {
+    Vector v1 = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    v1.set(1, 1);
+    v1.set(3, 1);
+
+    Vector v2 = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    v2.set(1, 1);
+    v2.set(7, 1);
+
+    Vector v3 = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    v3.set(1, 1);
+    v3.set(5, 1);
+    v3.set(9, 1);
+
+    MaybePruneRowsMapper mapper = new MaybePruneRowsMapper();
+    setField(mapper, "maxCooccurrences", 2);
+
+    Mapper<VarLongWritable,VectorWritable, IntWritable, DistributedRowMatrix.MatrixEntryWritable>.Context ctx =
+      EasyMock.createMock(Mapper.Context.class);
+
+    ctx.write(EasyMock.eq(new IntWritable(1)), MathHelper.matrixEntryMatches(1, 123, 1));
+    ctx.write(EasyMock.eq(new IntWritable(3)), MathHelper.matrixEntryMatches(3, 123, 1));
+    ctx.write(EasyMock.eq(new IntWritable(1)), MathHelper.matrixEntryMatches(1, 456, 1));
+    ctx.write(EasyMock.eq(new IntWritable(7)), MathHelper.matrixEntryMatches(7, 456, 1));    
+    ctx.write(EasyMock.eq(new IntWritable(5)), MathHelper.matrixEntryMatches(5, 789, 1));
+    ctx.write(EasyMock.eq(new IntWritable(9)), MathHelper.matrixEntryMatches(9, 789, 1));
+
+    EasyMock.replay(ctx);
+
+    mapper.map(new VarLongWritable(123L), new VectorWritable(v1), ctx);
+    mapper.map(new VarLongWritable(456L), new VectorWritable(v2), ctx);
+    mapper.map(new VarLongWritable(789L), new VectorWritable(v3), ctx);
+
+    EasyMock.verify(ctx);
+  }
+
+}

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java?rev=986573&r1=986572&r2=986573&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java Wed Aug 18 06:09:38 2010
@@ -41,8 +41,6 @@ import org.apache.mahout.math.VarIntWrit
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.MathHelper;
-import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
 import org.apache.mahout.math.hadoop.similarity.vector.DistributedTanimotoCoefficientVectorSimilarity;
 import org.apache.mahout.math.hadoop.similarity.vector.DistributedUncenteredZeroAssumingCosineVectorSimilarity;
 import org.apache.mahout.math.map.OpenIntLongHashMap;
@@ -111,65 +109,6 @@ public final class ItemSimilarityTest ex
   }
 
   /**
-   * tests {@link PrefsToItemUserMatrixMapper}
-   *
-   * @throws Exception
-   */
-  public void testPrefsToItemUserMatrixMapper() throws Exception {
-    Mapper<LongWritable,Text,VarIntWritable, MatrixEntryWritable>.Context context =
-      EasyMock.createMock(Mapper.Context.class);
-    context.write(EasyMock.eq(new VarIntWritable(TasteHadoopUtils.idToIndex(100L))),
-        MathHelper.matrixEntryMatches(TasteHadoopUtils.idToIndex(100L),
-        TasteHadoopUtils.idToIndex(12L), 1.3));
-    context.write(EasyMock.eq(new VarIntWritable(TasteHadoopUtils.idToIndex(20L))),
-        MathHelper.matrixEntryMatches(TasteHadoopUtils.idToIndex(20L), TasteHadoopUtils.idToIndex(35L), 3.0));
-    EasyMock.replay(context);
-
-    PrefsToItemUserMatrixMapper mapper = new PrefsToItemUserMatrixMapper();
-    mapper.map(null, new Text("12,100,1.3"), context);
-    mapper.map(null, new Text("35,20,3.0"), context);
-
-    EasyMock.verify(context);
-  }
-
-  public void testPrefsToItemUserMatrixMapperBoolean() throws Exception {
-    Mapper<LongWritable,Text,VarIntWritable, MatrixEntryWritable>.Context context =
-      EasyMock.createMock(Mapper.Context.class);
-    context.write(EasyMock.eq(new VarIntWritable(TasteHadoopUtils.idToIndex(100L))),
-        MathHelper.matrixEntryMatches(TasteHadoopUtils.idToIndex(100L),
-        TasteHadoopUtils.idToIndex(12L), 1.0));
-    context.write(EasyMock.eq(new VarIntWritable(TasteHadoopUtils.idToIndex(20L))),
-        MathHelper.matrixEntryMatches(TasteHadoopUtils.idToIndex(20L), TasteHadoopUtils.idToIndex(35L), 1.0));
-    EasyMock.replay(context);
-
-    PrefsToItemUserMatrixMapper mapper = new PrefsToItemUserMatrixMapper();
-    setField(mapper, "booleanData", Boolean.TRUE);
-    mapper.map(null, new Text("12,100"), context);
-    mapper.map(null, new Text("35,20,3.0"), context);
-
-    EasyMock.verify(context);
-  }
-
-  /**
-   * tests {@link PrefsToItemUserMatrixReducer}
-   */
-  public void testPrefsToItemUserMatrixReducer() throws Exception {
-    Reducer<VarIntWritable, MatrixEntryWritable,IntWritable,VectorWritable>.Context context =
-      EasyMock.createMock(Reducer.Context.class);
-
-    context.write(EasyMock.eq(new IntWritable(123)), MathHelper.vectorMatches(MathHelper.elem(1, 0.5),
-        MathHelper.elem(7, 2.0)));
-    EasyMock.replay(context);
-
-    List<MatrixEntryWritable> entries = Arrays.asList(MathHelper.matrixEntry(123, 1, 0.5),
-        MathHelper.matrixEntry(123, 7, 2.0));
-
-    new PrefsToItemUserMatrixReducer().reduce(new VarIntWritable(123), entries, context);
-
-    EasyMock.verify(context);
-  }
-
-  /**
    * tests {@link MostSimilarItemPairsMapper}
    */
   public void testMostSimilarItemsPairsMapper() throws Exception {

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java?rev=986573&r1=986572&r2=986573&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java Wed Aug 18 06:09:38 2010
@@ -61,7 +61,11 @@ public class MathHelper {
       }
 
       @Override
-      public void appendTo(StringBuffer buffer) {}
+      public void appendTo(StringBuffer buffer) {
+        buffer.append("MatrixEntry[row=").append(row)
+            .append(",col=").append(col)
+            .append(",value=").append(value).append("]");
+      }
     });
     return null;
   }