You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/07/08 21:19:54 UTC

svn commit: r961888 - in /mahout/trunk/core/src: main/java/org/apache/mahout/cf/taste/hadoop/ main/java/org/apache/mahout/cf/taste/hadoop/item/ main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ main/java/org/apache/mahout/math/hadoop/similar...

Author: srowen
Date: Thu Jul  8 19:19:53 2010
New Revision: 961888

URL: http://svn.apache.org/viewvc?rev=961888&view=rev
Log:
MAHOUT-420

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/MaybePruneRowsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedCooccurrenceVectorSimilarity.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java
Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java Thu Jul  8 19:19:53 2010
@@ -17,10 +17,17 @@
 
 package org.apache.mahout.cf.taste.hadoop;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.IOUtils;
 
 /**
  * some helper methods for the hadoop-related stuff in org.apache.mahout.cf.taste
@@ -60,4 +67,26 @@ public final class TasteHadoopUtils {
   public static int idToIndex(long id) {
     return 0x7FFFFFFF & ((int) id ^ (int) (id >>> 32));
   }
+  
+  /**
+   * reads a text-based outputfile that only contains an int
+   * 
+   * @param conf
+   * @param outputDir
+   * @return
+   * @throws IOException
+   */
+  public static int readIntFromFile(Configuration conf, Path outputDir) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path outputFile = fs.listStatus(outputDir, TasteHadoopUtils.PARTS_FILTER)[0].getPath();
+    InputStream in = null;
+    try  {
+      in = fs.open(outputFile);
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      IOUtils.copyBytes(in, out, conf);
+      return Integer.parseInt(new String(out.toByteArray(), Charset.forName("UTF-8")).trim());
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Thu Jul  8 19:19:53 2010
@@ -30,52 +30,60 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
 import org.apache.mahout.cf.taste.impl.common.FastIDSet;
 import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
 import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
 import org.apache.mahout.cf.taste.recommender.RecommendedItem;
 import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
-import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.function.UnaryFunction;
 import org.apache.mahout.math.map.OpenIntLongHashMap;
 
+/**
+ * <p>computes prediction values for each user</p>
+ *
+ * <pre>
+ * u = a user
+ * i = an item not yet rated by u
+ * N = all items similar to i (where similarity is usually computed by pairwisely comparing the item-vectors
+ * of the user-item matrix)
+ *
+ * Prediction(u,i) = sum(all n from N: similarity(i,n) * rating(u,n)) / sum(all n from N: abs(similarity(i,n)))
+ * </pre>
+ */
 public final class AggregateAndRecommendReducer extends
-    Reducer<VarLongWritable,VectorWritable,VarLongWritable,RecommendedItemsWritable> {
+    Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable> {
 
   static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
   static final String NUM_RECOMMENDATIONS = "numRecommendations";
   static final int DEFAULT_NUM_RECOMMENDATIONS = 10;
-  static final String ITEMS_FILE="itemsFile";
-  
-  private FastIDSet itemsToRecommendFor;
-  
-  private static final PathFilter PARTS_FILTER = new PathFilter() {
-    @Override
-    public boolean accept(Path path) {
-      return path.getName().startsWith("part-");
-    }
-  };
+  static final String ITEMS_FILE = "itemsFile";
 
+  private boolean booleanData;
   private int recommendationsPerUser;
+  private FastIDSet itemsToRecommendFor;
   private OpenIntLongHashMap indexItemIDMap;
 
   @Override
   protected void setup(Context context) {
     Configuration jobConf = context.getConfiguration();
     recommendationsPerUser = jobConf.getInt(NUM_RECOMMENDATIONS, DEFAULT_NUM_RECOMMENDATIONS);
+    booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
     try {
       FileSystem fs = FileSystem.get(jobConf);
       Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
       indexItemIDMap = new OpenIntLongHashMap();
       VarIntWritable index = new VarIntWritable();
       VarLongWritable id = new VarLongWritable();
-      for (FileStatus status : fs.listStatus(itemIDIndexPath, PARTS_FILTER)) {
+      for (FileStatus status : fs.listStatus(itemIDIndexPath, TasteHadoopUtils.PARTS_FILTER)) {
         String path = status.getPath().toString();
         SequenceFile.Reader reader =
             new SequenceFile.Reader(fs, new Path(path).makeQualified(fs), jobConf);
@@ -87,15 +95,15 @@ public final class AggregateAndRecommend
     } catch (IOException ioe) {
       throw new IllegalStateException(ioe);
     }
-    
+
     try {
         FileSystem fs = FileSystem.get(jobConf);
-        String usersFilePathString = jobConf.get(ITEMS_FILE);
-        if (usersFilePathString == null) {
+        String itemFilePathString = jobConf.get(ITEMS_FILE);
+        if (itemFilePathString == null) {
         	itemsToRecommendFor = null;
         } else {
         	itemsToRecommendFor = new FastIDSet();
-          Path usersFilePath = new Path(usersFilePathString).makeQualified(fs);
+          Path usersFilePath = new Path(itemFilePathString).makeQualified(fs);
           FSDataInputStream in = fs.open(usersFilePath);
           for (String line : new FileLineIterable(in)) {
         	  itemsToRecommendFor.add(Long.parseLong(line));
@@ -106,38 +114,113 @@ public final class AggregateAndRecommend
       }
   }
 
+  private static final UnaryFunction ABSOLUTE_VALUES = new UnaryFunction() {
+    @Override
+    public double apply(double value) {
+      return value < 0 ? value * -1 : value;
+    }
+  };
+
   @Override
-  protected void reduce(VarLongWritable key,
-                        Iterable<VectorWritable> values,
+  protected void reduce(VarLongWritable userID,
+                        Iterable<PrefAndSimilarityColumnWritable> values,
                         Context context) throws IOException, InterruptedException {
+    if (booleanData) {
+      reduceBooleanData(userID, values, context);
+    } else {
+      reduceNonBooleanData(userID, values, context);
+    }
+  }
 
-    Vector recommendationVector = null;
-    for (VectorWritable vectorWritable : values) {
-      recommendationVector = recommendationVector == null
-          ? vectorWritable.get()
-          : recommendationVector.plus(vectorWritable.get());
+  private void reduceBooleanData(VarLongWritable userID,
+                        Iterable<PrefAndSimilarityColumnWritable> values,
+                        Context context) throws IOException, InterruptedException {
+
+    /* having boolean data, each estimated preference can only be 1,
+     * so the computation is much simpler */
+    Vector predictionVector = null;
+    for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
+      predictionVector = predictionVector == null
+          ? prefAndSimilarityColumn.getSimilarityColumn()
+          : predictionVector.plus(prefAndSimilarityColumn.getSimilarityColumn());
+    }
+
+    Iterator<Element> predictions = predictionVector.iterateNonZero();
+    List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>();
+    while (predictions.hasNext() && recommendations.size() < recommendationsPerUser) {
+      int itemIDIndex = predictions.next().index();
+      long itemID = indexItemIDMap.get(itemIDIndex);
+      if (itemsToRecommendFor == null || itemsToRecommendFor.contains(itemID)) {
+        recommendations.add(new GenericRecommendedItem(itemID, 1f));
+      }
     }
-    if (recommendationVector == null) {
+
+    if (!recommendations.isEmpty()) {
+      context.write(userID, new RecommendedItemsWritable(recommendations));
+    }
+  }
+
+  private void reduceNonBooleanData(VarLongWritable userID,
+                        Iterable<PrefAndSimilarityColumnWritable> values,
+                        Context context) throws IOException, InterruptedException {
+    /* each entry here is the sum in the numerator of the prediction formula */
+    Vector numerators = null;
+    /* each entry here is the sum in the denominator of the prediction formula */
+    Vector denominators = null;
+    /* each entry here is the number of similar items used in the prediction formula */
+    Vector numberOfSimilarItemsUsed = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+
+    for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
+      Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn();
+      float prefValue = prefAndSimilarityColumn.getPrefValue();
+      /* count the number of items used for each prediction */
+      Iterator<Element> usedItemsIterator = simColumn.iterateNonZero();
+      while (usedItemsIterator.hasNext()) {
+        int itemIDIndex = usedItemsIterator.next().index();
+        numberOfSimilarItemsUsed.setQuick(itemIDIndex, numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1);
+      }
+
+      numerators = numerators == null
+          ? prefValue == 1.0f ? simColumn.clone() : simColumn.times(prefValue)
+          : numerators.plus(prefValue == 1.0f ? simColumn : simColumn.times(prefValue));
+
+      simColumn.assign(ABSOLUTE_VALUES);
+      denominators = denominators == null ? simColumn : denominators.plus(simColumn);
+    }
+
+    if (numerators == null) {
       return;
     }
 
+    Vector recommendationVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    Iterator<Element> iterator = numerators.iterateNonZero();
+    while (iterator.hasNext()) {
+      Element element = iterator.next();
+      int itemIDIndex = element.index();
+      /* preference estimations must be based on at least 2 datapoints */
+      if (numberOfSimilarItemsUsed.getQuick(itemIDIndex) > 1) {
+        /* compute normalized prediction */
+        double prediction = element.get() / denominators.getQuick(itemIDIndex);
+        recommendationVector.setQuick(itemIDIndex, prediction);
+      }
+    }
+
     Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1,
     Collections.reverseOrder(ByValueRecommendedItemComparator.getInstance()));
 
-    Iterator<Vector.Element> recommendationVectorIterator =
-        recommendationVector.iterateNonZero();
+    Iterator<Vector.Element> recommendationVectorIterator = recommendationVector.iterateNonZero();
     while (recommendationVectorIterator.hasNext()) {
       Vector.Element element = recommendationVectorIterator.next();
       int index = element.index();
-  
-      long itemId = indexItemIDMap.get(index);
-      if (itemsToRecommendFor == null || itemsToRecommendFor.contains(itemId)) {
+
+      long itemID = indexItemIDMap.get(index);
+      if (itemsToRecommendFor == null || itemsToRecommendFor.contains(itemID)) {
         float value = (float) element.get();
         if (!Float.isNaN(value)) {
           if (topItems.size() < recommendationsPerUser) {
-            topItems.add(new GenericRecommendedItem(itemId, value));
+            topItems.add(new GenericRecommendedItem(itemID, value));
           } else if (value > topItems.peek().getValue()) {
-            topItems.add(new GenericRecommendedItem(itemId, value));
+            topItems.add(new GenericRecommendedItem(itemID, value));
             topItems.poll();
           }
         }
@@ -148,7 +231,7 @@ public final class AggregateAndRecommend
       List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>(topItems.size());
       recommendations.addAll(topItems);
       Collections.sort(recommendations, ByValueRecommendedItemComparator.getInstance());
-      context.write(key, new RecommendedItemsWritable(recommendations));
+      context.write(userID, new RecommendedItemsWritable(recommendations));
     }
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java Thu Jul  8 19:19:53 2010
@@ -18,20 +18,18 @@
 package org.apache.mahout.cf.taste.hadoop.item;
 
 import java.io.IOException;
-import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
 import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
 
 public final class ItemIDIndexMapper extends
     Mapper<LongWritable,Text, VarIntWritable, VarLongWritable> {
-  
-  private static final Pattern COMMA = Pattern.compile(",");
 
   private boolean transpose;
 
@@ -45,14 +43,9 @@ public final class ItemIDIndexMapper ext
   protected void map(LongWritable key,
                      Text value,
                      Context context) throws IOException, InterruptedException {
-    String[] tokens = ItemIDIndexMapper.COMMA.split(value.toString());
+    String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
     long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);
-    int index = idToIndex(itemID);
+    int index = TasteHadoopUtils.idToIndex(itemID);
     context.write(new VarIntWritable(index), new VarLongWritable(itemID));
-  }
-  
-  static int idToIndex(long itemID) {
-    return 0x7FFFFFFF & ((int) itemID ^ (int) (itemID >>> 32));
-  }
-  
+  }  
 }

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/MaybePruneRowsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/MaybePruneRowsMapper.java?rev=961888&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/MaybePruneRowsMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/MaybePruneRowsMapper.java Thu Jul  8 19:19:53 2010
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
+
+public class MaybePruneRowsMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+    private int maxSimilaritiesPerItemConsidered;
+    private final OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      super.setup(ctx);
+      maxSimilaritiesPerItemConsidered =
+          ctx.getConfiguration().getInt(RecommenderJob.MAX_SIMILARITIES_PER_ITEM_CONSIDERED, -1);
+      if (maxSimilaritiesPerItemConsidered < 1) {
+        throw new IllegalStateException("Maximum number of similarities per item was not correctly set!");
+      }
+    }
+
+    @Override
+    protected void map(IntWritable rowIndex, VectorWritable vectorWritable, Context ctx)
+        throws IOException, InterruptedException {
+      Vector vector = vectorWritable.get();
+      countSeen(vector);
+      vector = maybePruneVector(vector);
+      vectorWritable.set(vector);
+      vectorWritable.setWritesLaxPrecision(true);
+      ctx.write(rowIndex, vectorWritable);
+    }
+
+    private void countSeen(Vector vector) {
+      Iterator<Vector.Element> it = vector.iterateNonZero();
+      while (it.hasNext()) {
+        int index = it.next().index();
+        indexCounts.adjustOrPutValue(index, 1, 1);
+      }
+    }
+
+    private Vector maybePruneVector(Vector vector) {
+      if (vector.getNumNondefaultElements() <= maxSimilaritiesPerItemConsidered) {
+        return vector;
+      }
+
+      PriorityQueue<Integer> smallCounts =
+          new PriorityQueue<Integer>(maxSimilaritiesPerItemConsidered + 1, Collections.reverseOrder());
+      Iterator<Vector.Element> it = vector.iterateNonZero();
+      while (it.hasNext()) {
+        int count = indexCounts.get(it.next().index());
+        if (count > 0) {
+          if (smallCounts.size() < maxSimilaritiesPerItemConsidered) {
+            smallCounts.add(count);
+          } else if (count < smallCounts.peek()) {
+            smallCounts.add(count);
+            smallCounts.poll();
+          }
+       }
+     }
+
+     int greatestSmallCount = smallCounts.peek();
+     if (greatestSmallCount > 0) {
+       Iterator<Vector.Element> it2 = vector.iterateNonZero();
+       while (it2.hasNext()) {
+         Vector.Element e = it2.next();
+         if (indexCounts.get(e.index()) > greatestSmallCount) {
+           e.set(0.0);
+         }
+       }
+     }
+     return vector;
+    }
+  }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java Thu Jul  8 19:19:53 2010
@@ -21,53 +21,37 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
-import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.Vector;
 
+/**
+ * maps similar items and their preference values per user
+ */
 public final class PartialMultiplyMapper extends
-    Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,VectorWritable> {
+    Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,PrefAndSimilarityColumnWritable> {
 
   @Override
   protected void map(VarIntWritable key,
                      VectorAndPrefsWritable vectorAndPrefsWritable,
                      Context context) throws IOException, InterruptedException {
 
-    int itemIndex = key.get();
-
-    Vector cooccurrenceColumn = vectorAndPrefsWritable.getVector();
+    Vector similarityMatrixColumn = vectorAndPrefsWritable.getVector();
     List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
     List<Float> prefValues = vectorAndPrefsWritable.getValues();
 
     VarLongWritable userIDWritable = new VarLongWritable();
-
-    // These single-element vectors ensure that each user will not be recommended
-    // this item
-    Vector excludeVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
-    excludeVector.set(itemIndex, Double.NaN);
-    VectorWritable excludeWritable = new VectorWritable(excludeVector);
-    excludeWritable.setWritesLaxPrecision(true);
-    for (long userID : userIDs) {
-      userIDWritable.set(userID);
-      context.write(userIDWritable, excludeWritable);
-    }
-
-    VectorWritable vectorWritable = new VectorWritable();
-    vectorWritable.setWritesLaxPrecision(true);
+    PrefAndSimilarityColumnWritable prefAndSimilarityColumn = new PrefAndSimilarityColumnWritable();
 
     for (int i = 0; i < userIDs.size(); i++) {
       long userID = userIDs.get(i);
       float prefValue = prefValues.get(i);
       if (!Float.isNaN(prefValue)) {
-        Vector partialProduct = prefValue == 1.0f ? cooccurrenceColumn : cooccurrenceColumn.times(prefValue);
+        prefAndSimilarityColumn.set(prefValue, similarityMatrixColumn);
         userIDWritable.set(userID);
-        vectorWritable.set(partialProduct);
-        context.write(userIDWritable, vectorWritable);
+        context.write(userIDWritable, prefAndSimilarityColumn);
       }
     }
-
   }
 
 }
\ No newline at end of file

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java?rev=961888&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java Thu Jul  8 19:19:53 2010
@@ -0,0 +1,70 @@
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public class PrefAndSimilarityColumnWritable implements Writable {
+
+  private float prefValue;
+  private Vector similarityColumn;
+
+  public PrefAndSimilarityColumnWritable() {
+    super();
+  }
+
+  public PrefAndSimilarityColumnWritable(float prefValue, Vector similarityColumn) {
+    super();
+    set(prefValue, similarityColumn);
+  }
+
+  public void set(float prefValue, Vector similarityColumn) {
+    this.prefValue = prefValue;
+    this.similarityColumn = similarityColumn;
+  }
+
+  public float getPrefValue() {
+    return prefValue;
+  }
+
+  public Vector getSimilarityColumn() {
+    return similarityColumn;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    prefValue = in.readFloat();
+    VectorWritable vw = new VectorWritable();
+    vw.readFields(in);
+    similarityColumn = vw.get();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeFloat(prefValue);
+    VectorWritable vw = new VectorWritable(similarityColumn);
+    vw.setWritesLaxPrecision(true);
+    vw.write(out);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof PrefAndSimilarityColumnWritable) {
+      PrefAndSimilarityColumnWritable other = (PrefAndSimilarityColumnWritable) obj;
+      return prefValue == other.prefValue && similarityColumn.equals(other.similarityColumn);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return RandomUtils.hashFloat(prefValue) + 31 * similarityColumn.hashCode();
+  }
+
+
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Thu Jul  8 19:19:53 2010
@@ -25,6 +25,8 @@ import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -36,29 +38,38 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
 import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersKeyWritable;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersMapper;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersReducer;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.PrefsToItemUserMatrixMapper;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.PrefsToItemUserMatrixReducer;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob;
 
 /**
  * <p>Runs a completely distributed recommender job as a series of mapreduces.</p>
- * 
+ *
  * <p>Command line arguments specific to this class are:</p>
- * 
+ *
  * <ol>
  * <li>-Dmapred.input.dir=(path): Directory containing a text file containing user IDs
  *  for which recommendations should be computed, one per line</li>
  * <li>-Dmapred.output.dir=(path): output path where recommender output should go</li>
+ * <li>--similarityClassname (classname): Name of distributed similarity class to instantiate</li>
  * <li>--usersFile (path): file containing user IDs to recommend for (optional)</li>
  * <li>--itemsFile (path): file containing item IDs to recommend for (optional)</li>
  * <li>--numRecommendations (integer): Number of recommendations to compute per user (optional; default 10)</li>
  * <li>--booleanData (boolean): Treat input data as having to pref values (false)</li>
  * <li>--maxPrefsPerUserConsidered (integer): Maximum number of preferences considered per user in
  *  final recommendation phase (10)</li>
- * <li>--maxCooccurrencesPerItemConsidered: Maximum number of cooccurrences considered per item
- *  in count phase (100)</li>
+ * <li>--maxSimilaritiesPerItemConsidered (integer): Maximum number of similarities considered per item (optional;
+ *  default 100)</li>
  * </ol>
  *
  * <p>General command line options are documented in {@link AbstractJob}.</p>
@@ -68,8 +79,12 @@ import org.apache.mahout.math.VectorWrit
  */
 public final class RecommenderJob extends AbstractJob {
 
+  static final String MAX_SIMILARITIES_PER_ITEM_CONSIDERED = RecommenderJob.class.getName() +
+      ".maxSimilaritiesPerItemConsidered";
+
   public static final String BOOLEAN_DATA = "booleanData";
-  
+  public static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM_CONSIDERED = 100;
+
   @Override
   public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
 
@@ -83,15 +98,16 @@ public final class RecommenderJob extend
     addOption("maxPrefsPerUserConsidered", null,
       "Maximum number of preferences considered per user in final recommendation phase",
       String.valueOf(UserVectorSplitterMapper.DEFAULT_MAX_PREFS_PER_USER_CONSIDERED));
-    addOption("maxCooccurrencesPerItemConsidered", null,
-      "Maximum number of cooccurrences considered per item in count phase",
-      String.valueOf(UserVectorToCooccurrenceMapper.DEFAULT_MAX_COOCCURRENCES_PER_ITEM_CONSIDERED));
+    addOption("maxSimilaritiesPerItemConsidered", null,
+      "Maximum number of similarities considered per item ",
+      String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ITEM_CONSIDERED));
+    addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate");
 
     Map<String,String> parsedArgs = parseArguments(args);
     if (parsedArgs == null) {
       return -1;
     }
-    
+
     Path inputPath = getInputPath();
     Path outputPath = getOutputPath();
     Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
@@ -100,11 +116,15 @@ public final class RecommenderJob extend
     String itemsFile = parsedArgs.get("--itemsFile");
     boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
     int maxPrefsPerUserConsidered = Integer.parseInt(parsedArgs.get("--maxPrefsPerUserConsidered"));
-    int maxCooccurrencesPerItemConsidered = Integer.parseInt(parsedArgs.get("--maxCooccurrencesPerItemConsidered"));
+    int maxSimilaritiesPerItemConsidered = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItemConsidered"));
+    String similarityClassname = parsedArgs.get("--similarityClassname");
 
     Path userVectorPath = new Path(tempDirPath, "userVectors");
     Path itemIDIndexPath = new Path(tempDirPath, "itemIDIndex");
-    Path cooccurrencePath = new Path(tempDirPath, "cooccurrence");
+    Path countUsersPath = new Path(tempDirPath, "countUsers");
+    Path itemUserMatrixPath = new Path(tempDirPath, "itemUserMatrix");
+    Path maybePruneItemUserMatrixPath = new Path(tempDirPath, "maybePruneItemUserMatrixPath");
+    Path similarityMatrixPath = new Path(tempDirPath, "similarityMatrix");
     Path prePartialMultiplyPath1 = new Path(tempDirPath, "prePartialMultiply1");
     Path prePartialMultiplyPath2 = new Path(tempDirPath, "prePartialMultiply2");
     Path partialMultiplyPath = new Path(tempDirPath, "partialMultiply");
@@ -132,21 +152,70 @@ public final class RecommenderJob extend
     }
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      Job toCooccurrence = prepareJob(
-        userVectorPath, cooccurrencePath, SequenceFileInputFormat.class,
-        UserVectorToCooccurrenceMapper.class, VarIntWritable.class, VarIntWritable.class,
-        UserVectorToCooccurrenceReducer.class, VarIntWritable.class, VectorWritable.class,
-        SequenceFileOutputFormat.class);
-      setIOSort(toCooccurrence);
-      toCooccurrence.getConfiguration().setInt(UserVectorToCooccurrenceMapper.MAX_COOCCURRENCES_PER_ITEM_CONSIDERED,
-                                               maxCooccurrencesPerItemConsidered);
-      toCooccurrence.waitForCompletion(true);
+      Job countUsers = prepareJob(inputPath,
+                                  countUsersPath,
+                                  TextInputFormat.class,
+                                  CountUsersMapper.class,
+                                  CountUsersKeyWritable.class,
+                                  VarLongWritable.class,
+                                  CountUsersReducer.class,
+                                  VarIntWritable.class,
+                                  NullWritable.class,
+                                  TextOutputFormat.class);
+        countUsers.setPartitionerClass(CountUsersKeyWritable.CountUsersPartitioner.class);
+        countUsers.setGroupingComparatorClass(CountUsersKeyWritable.CountUsersGroupComparator.class);
+        countUsers.waitForCompletion(true);
+    }
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job itemUserMatrix = prepareJob(inputPath,
+                                  itemUserMatrixPath,
+                                  TextInputFormat.class,
+                                  PrefsToItemUserMatrixMapper.class,
+                                  VarIntWritable.class,
+                                  DistributedRowMatrix.MatrixEntryWritable.class,
+                                  PrefsToItemUserMatrixReducer.class,
+                                  IntWritable.class,
+                                  VectorWritable.class,
+                                  SequenceFileOutputFormat.class);
+      itemUserMatrix.waitForCompletion(true);
+    }
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job maybePruneItemUserMatrix = prepareJob(itemUserMatrixPath,
+                                  maybePruneItemUserMatrixPath,
+                                  SequenceFileInputFormat.class,
+                                  MaybePruneRowsMapper.class,
+                                  IntWritable.class,
+                                  VectorWritable.class,
+                                  Reducer.class,
+                                  IntWritable.class,
+                                  VectorWritable.class,
+                                  SequenceFileOutputFormat.class);
+      maybePruneItemUserMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ITEM_CONSIDERED,
+          maxSimilaritiesPerItemConsidered);
+      maybePruneItemUserMatrix.waitForCompletion(true);
+    }
+
+    int numberOfUsers = TasteHadoopUtils.readIntFromFile(getConf(), countUsersPath);
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      /* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like
+       * new DistributedRowMatrix(...).rowSimilarity(...) */
+      try {
+        RowSimilarityJob.main(new String[] { "-Dmapred.input.dir=" + maybePruneItemUserMatrixPath.toString(),
+            "-Dmapred.output.dir=" + similarityMatrixPath.toString(), "--numberOfColumns",
+            String.valueOf(numberOfUsers), "--similarityClassname", similarityClassname, "--maxSimilaritiesPerRow",
+            String.valueOf(maxSimilaritiesPerItemConsidered + 1), "--tempDir", tempDirPath.toString() });
+      } catch (Exception e) {
+        throw new IllegalStateException("item-item-similarity computation failed", e);
+      }
     }
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
       Job prePartialMultiply1 = prepareJob(
-        cooccurrencePath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
-        CooccurrenceColumnWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
+        similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
+        SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
         Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
         SequenceFileOutputFormat.class);
       prePartialMultiply1.waitForCompletion(true);
@@ -173,20 +242,19 @@ public final class RecommenderJob extend
     }
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-    
       Job aggregateAndRecommend = prepareJob(
           partialMultiplyPath, outputPath, SequenceFileInputFormat.class,
-          PartialMultiplyMapper.class, VarLongWritable.class, VectorWritable.class,
+          PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
           AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
           TextOutputFormat.class);
       Configuration jobConf = aggregateAndRecommend.getConfiguration();
       if (itemsFile != null) {
     	  jobConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
-    }
+      }
       setIOSort(aggregateAndRecommend);
-      aggregateAndRecommend.setCombinerClass(AggregateCombiner.class);
       jobConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath.toString());
       jobConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
+      jobConf.setBoolean(BOOLEAN_DATA, booleanData);
       aggregateAndRecommend.waitForCompletion(true);
     }
 
@@ -213,9 +281,8 @@ public final class RecommenderJob extend
     // timeout when running these jobs
     conf.setInt("mapred.task.timeout", 60 * 60 * 1000);
   }
-  
+
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new Configuration(), new RecommenderJob(), args);
   }
-  
 }

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java?rev=961888&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java Thu Jul  8 19:19:53 2010
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * maps a row of the similarity matrix to a {@link VectorOrPrefWritable}
+ * 
+ * actually a column from that matrix has to be used but as the similarity matrix is symmetric, 
+ * we can use a row instead of having to transpose it
+ */
+public final class SimilarityMatrixRowWrapperMapper extends
+    Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> {
+
+  @Override
+  protected void map(IntWritable key,
+                     VectorWritable value,
+                     Context context) throws IOException, InterruptedException {
+    Vector similarityMatrixRow = value.get();
+    /* remove self similarity */
+    similarityMatrixRow.set(key.get(), Double.NaN);
+    context.write(new VarIntWritable(key.get()), new VectorOrPrefWritable(similarityMatrixRow));
+  }
+
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java Thu Jul  8 19:19:53 2010
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
 import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Vector;
@@ -52,7 +53,7 @@ public final class ToUserVectorReducer e
                         Context context) throws IOException, InterruptedException {
     Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
     for (VarLongWritable itemPref : itemPrefs) {
-      int index = ItemIDIndexMapper.idToIndex(itemPref.get());
+      int index = TasteHadoopUtils.idToIndex(itemPref.get());
       float value = itemPref instanceof EntityPrefWritable ? ((EntityPrefWritable) itemPref).getPrefValue() : 1.0f;
       userVector.set(index, value);
     }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java Thu Jul  8 19:19:53 2010
@@ -35,7 +35,7 @@ public final class ToVectorAndPrefReduce
 
     List<Long> userIDs = new ArrayList<Long>();
     List<Float> prefValues = new ArrayList<Float>();
-    Vector cooccurrenceColumn = null;
+    Vector similarityMatrixColumn = null;
     for (VectorOrPrefWritable value : values) {
       if (value.getVector() == null) {
         // Then this is a user-pref value
@@ -43,19 +43,19 @@ public final class ToVectorAndPrefReduce
         prefValues.add(value.getValue());
       } else {
         // Then this is the column vector
-        if (cooccurrenceColumn != null) {
-          throw new IllegalStateException("Found two co-occurrence columns for item index " + key.get());
+        if (similarityMatrixColumn != null) {
+          throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get());
         }
-        cooccurrenceColumn = value.getVector();
+        similarityMatrixColumn = value.getVector();
       }
     }
 
-    if (cooccurrenceColumn == null) {
+    if (similarityMatrixColumn == null) {
       return;
     }
 
-    VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(cooccurrenceColumn, userIDs, prefValues);
+    VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityMatrixColumn, userIDs, prefValues);
     context.write(key, vectorAndPrefs);
   }
 
-}
\ No newline at end of file
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java Thu Jul  8 19:19:53 2010
@@ -25,7 +25,6 @@ import org.apache.mahout.math.RandomAcce
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.DistributedRowMatrix;
 import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
 
 /**

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java Thu Jul  8 19:19:53 2010
@@ -86,7 +86,7 @@ public class RowSimilarityJob extends Ab
   }
 
   @Override
-  public int run(String[] args) throws Exception {
+  public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
 
     addInputOption();
     addOutputOption();
@@ -341,4 +341,4 @@ public class RowSimilarityJob extends Ab
     }
   }
 
-}
+}
\ No newline at end of file

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedCooccurrenceVectorSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedCooccurrenceVectorSimilarity.java?rev=961888&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedCooccurrenceVectorSimilarity.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedCooccurrenceVectorSimilarity.java Thu Jul  8 19:19:53 2010
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity.vector;
+
+import org.apache.mahout.math.hadoop.similarity.Cooccurrence;
+
+/**
+ * uses the co-occcurence count as vector similarity
+ */
+public class DistributedCooccurrenceVectorSimilarity extends AbstractDistributedVectorSimilarity {
+
+  @Override
+  protected double doComputeResult(int rowA, int rowB, Iterable<Cooccurrence> cooccurrences, double weightOfVectorA,
+      double weightOfVectorB, int numberOfColumns) {
+    return countElements(cooccurrences);
+  }
+
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java?rev=961888&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java Thu Jul  8 19:19:53 2010
@@ -0,0 +1,740 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
+import org.apache.mahout.cf.taste.impl.TasteTestCase;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
+import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.MathHelper;
+import org.apache.mahout.math.hadoop.similarity.vector.DistributedTanimotoCoefficientVectorSimilarity;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
+import org.easymock.IArgumentMatcher;
+import org.easymock.classextension.EasyMock;
+
+public class RecommenderJobTest extends TasteTestCase {
+
+  /**
+   * tests {@link ItemIDIndexMapper}
+   *
+   * @throws Exception
+   */
+  public void testItemIDIndexMapper() throws Exception {
+    Mapper<LongWritable,Text, VarIntWritable, VarLongWritable>.Context context =
+      EasyMock.createMock(Mapper.Context.class);
+
+    context.write(new VarIntWritable(TasteHadoopUtils.idToIndex(789L)), new VarLongWritable(789L));
+    EasyMock.replay(context);
+
+    new ItemIDIndexMapper().map(new LongWritable(123L), new Text("456,789,5.0"), context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * tests {@link ItemIDIndexReducer}
+   *
+   * @throws Exception
+   */
+  public void testItemIDIndexReducer() throws Exception {
+    Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable>.Context context =
+      EasyMock.createMock(Reducer.Context.class);
+
+    context.write(new VarIntWritable(123), new VarLongWritable(45L));
+    EasyMock.replay(context);
+
+    new ItemIDIndexReducer().reduce(new VarIntWritable(123), Arrays.asList(new VarLongWritable(67L),
+        new VarLongWritable(89L), new VarLongWritable(45L)), context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * tests {@link ToItemPrefsMapper}
+   *
+   * @throws Exception
+   */
+  public void testToItemPrefsMapper() throws Exception {
+    Mapper<LongWritable,Text, VarLongWritable,VarLongWritable>.Context context =
+      EasyMock.createMock(Mapper.Context.class);
+
+    context.write(new VarLongWritable(12L), new EntityPrefWritable(34L, 1f));
+    context.write(new VarLongWritable(56L), new EntityPrefWritable(78L, 2f));
+    EasyMock.replay(context);
+
+    ToItemPrefsMapper mapper = new ToItemPrefsMapper();
+    mapper.map(new LongWritable(123L), new Text("12,34,1"), context);
+    mapper.map(new LongWritable(456L), new Text("56,78,2"), context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * tests {@link ToItemPrefsMapper} using boolean data
+   *
+   * @throws Exception
+   */
+  public void testToItemPrefsMapperBooleanData() throws Exception {
+    Mapper<LongWritable,Text, VarLongWritable,VarLongWritable>.Context context =
+      EasyMock.createMock(Mapper.Context.class);
+
+    context.write(new VarLongWritable(12L), new VarLongWritable(34L));
+    context.write(new VarLongWritable(56L), new VarLongWritable(78L));
+    EasyMock.replay(context);
+
+    ToItemPrefsMapper mapper = new ToItemPrefsMapper();
+    setField(mapper, "booleanData", true);
+    mapper.map(new LongWritable(123L), new Text("12,34"), context);
+    mapper.map(new LongWritable(456L), new Text("56,78"), context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * tests {@link ToUserVectorReducer}
+   *
+   * @throws Exception
+   */
+  public void testToUserVectorReducer() throws Exception {
+    Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable>.Context context =
+      EasyMock.createMock(Reducer.Context.class);
+
+    context.write(EasyMock.eq(new VarLongWritable(12L)), MathHelper.vectorMatches(
+        MathHelper.elem(TasteHadoopUtils.idToIndex(34L), 1d), MathHelper.elem(TasteHadoopUtils.idToIndex(56L), 2d)));
+
+    EasyMock.replay(context);
+
+    List<VarLongWritable> varLongWritables = new LinkedList<VarLongWritable>();
+    varLongWritables.add(new EntityPrefWritable(34L, 1f));
+    varLongWritables.add(new EntityPrefWritable(56L, 2f));
+
+    new ToUserVectorReducer().reduce(new VarLongWritable(12L), varLongWritables, context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * tests {@link ToUserVectorReducer} using boolean data
+   *
+   * @throws Exception
+   */
+  public void testToUserVectorReducerWithBooleanData() throws Exception {
+    Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable>.Context context =
+      EasyMock.createMock(Reducer.Context.class);
+
+    context.write(EasyMock.eq(new VarLongWritable(12L)), MathHelper.vectorMatches(
+        MathHelper.elem(TasteHadoopUtils.idToIndex(34L), 1d), MathHelper.elem(TasteHadoopUtils.idToIndex(56L), 1d)));
+
+    EasyMock.replay(context);
+
+    new ToUserVectorReducer().reduce(new VarLongWritable(12L), Arrays.asList(new VarLongWritable(34L),
+        new VarLongWritable(56L)), context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * tests {@link SimilarityMatrixRowWrapperMapper}
+   *
+   * @throws Exception
+   */
+  public void testSimilarityMatrixRowWrapperMapper() throws Exception {
+    Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable>.Context context =
+      EasyMock.createMock(Mapper.Context.class);
+
+    context.write(EasyMock.eq(new VarIntWritable(12)), vectorOfVectorOrPrefWritableMatches(MathHelper.elem(34, 0.5d),
+        MathHelper.elem(56, 0.7d)));
+
+    EasyMock.replay(context);
+
+    RandomAccessSparseVector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    vector.set(12, 1d);
+    vector.set(34, 0.5d);
+    vector.set(56, 0.7d);
+
+    new SimilarityMatrixRowWrapperMapper().map(new IntWritable(12), new VectorWritable(vector), context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * verifies the {@link Vector} included in a {@link VectorOrPrefWritable}
+   *
+   * @param elements
+   * @return
+   */
+  public static VectorOrPrefWritable vectorOfVectorOrPrefWritableMatches(final Vector.Element... elements) {
+    EasyMock.reportMatcher(new IArgumentMatcher() {
+      @Override
+      public boolean matches(Object argument) {
+        if (argument instanceof VectorOrPrefWritable) {
+          Vector v = ((VectorOrPrefWritable) argument).getVector();
+          return MathHelper.consistsOf(v, elements);
+        }
+        return false;
+      }
+
+      @Override
+      public void appendTo(StringBuffer buffer) {}
+    });
+    return null;
+  }
+
+  /**
+   * tests {@link UserVectorSplitterMapper}
+   *
+   * @throws Exception
+   */
+  public void testUserVectorSplitterMapper() throws Exception {
+    Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable>.Context context =
+        EasyMock.createMock(Mapper.Context.class);
+
+    context.write(EasyMock.eq(new VarIntWritable(34)), prefOfVectorOrPrefWritableMatches(123L, 0.5f));
+    context.write(EasyMock.eq(new VarIntWritable(56)), prefOfVectorOrPrefWritableMatches(123L, 0.7f));
+
+    EasyMock.replay(context);
+
+    UserVectorSplitterMapper mapper = new UserVectorSplitterMapper();
+    setField(mapper, "maxPrefsPerUserConsidered", 10);
+
+    RandomAccessSparseVector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    vector.set(34, 0.5d);
+    vector.set(56, 0.7d);
+
+    mapper.map(new VarLongWritable(123L), new VectorWritable(vector), context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * verifies a preference in a {@link VectorOrPrefWritable}
+   *
+   * @param userID
+   * @param prefValue
+   * @return
+   */
+  public static VectorOrPrefWritable prefOfVectorOrPrefWritableMatches(final long userID, final float prefValue) {
+    EasyMock.reportMatcher(new IArgumentMatcher() {
+      @Override
+      public boolean matches(Object argument) {
+        if (argument instanceof VectorOrPrefWritable) {
+          VectorOrPrefWritable pref = ((VectorOrPrefWritable) argument);
+          return pref.getUserID() == userID && pref.getValue() == prefValue;
+        }
+        return false;
+      }
+
+      @Override
+      public void appendTo(StringBuffer buffer) {}
+    });
+    return null;
+  }
+
+  /**
+   * tests {@link UserVectorSplitterMapper} in the special case that some userIDs shall be excluded
+   *
+   * @throws Exception
+   */
+  public void testUserVectorSplitterMapperUserExclusion() throws Exception {
+    Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable>.Context context =
+        EasyMock.createMock(Mapper.Context.class);
+
+    context.write(EasyMock.eq(new VarIntWritable(34)), prefOfVectorOrPrefWritableMatches(123L, 0.5f));
+    context.write(EasyMock.eq(new VarIntWritable(56)), prefOfVectorOrPrefWritableMatches(123L, 0.7f));
+
+    EasyMock.replay(context);
+
+    FastIDSet usersToRecommendFor = new FastIDSet();
+    usersToRecommendFor.add(123L);
+
+    UserVectorSplitterMapper mapper = new UserVectorSplitterMapper();
+    setField(mapper, "maxPrefsPerUserConsidered", 10);
+    setField(mapper, "usersToRecommendFor", usersToRecommendFor);
+
+
+    RandomAccessSparseVector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    vector.set(34, 0.5d);
+    vector.set(56, 0.7d);
+
+    mapper.map(new VarLongWritable(123L), new VectorWritable(vector), context);
+    mapper.map(new VarLongWritable(456L), new VectorWritable(vector), context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * tests {@link UserVectorSplitterMapper} in the special case that the number of preferences to be considered
+   * is less than the number of available preferences
+   *
+   * @throws Exception
+   */
+  public void testUserVectorSplitterMapperOnlySomePrefsConsidered() throws Exception {
+    Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable>.Context context =
+        EasyMock.createMock(Mapper.Context.class);
+
+    context.write(EasyMock.eq(new VarIntWritable(34)), prefOfVectorOrPrefWritableMatchesNaN(123L));
+    context.write(EasyMock.eq(new VarIntWritable(56)), prefOfVectorOrPrefWritableMatches(123L, 0.7f));
+
+    EasyMock.replay(context);
+
+    UserVectorSplitterMapper mapper = new UserVectorSplitterMapper();
+    setField(mapper, "maxPrefsPerUserConsidered", 1);
+
+    RandomAccessSparseVector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    vector.set(34, 0.5d);
+    vector.set(56, 0.7d);
+
+    mapper.map(new VarLongWritable(123L), new VectorWritable(vector), context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * verifies that a preference value is NaN in a {@link VectorOrPrefWritable}
+   *
+   * @param userID
+   * @return
+   */
+  public static VectorOrPrefWritable prefOfVectorOrPrefWritableMatchesNaN(final long userID) {
+    EasyMock.reportMatcher(new IArgumentMatcher() {
+      @Override
+      public boolean matches(Object argument) {
+        if (argument instanceof VectorOrPrefWritable) {
+          VectorOrPrefWritable pref = ((VectorOrPrefWritable) argument);
+          return pref.getUserID() == userID && Float.isNaN(pref.getValue());
+        }
+        return false;
+      }
+
+      @Override
+      public void appendTo(StringBuffer buffer) {}
+    });
+    return null;
+  }
+
+  /**
+   * tests {@link ToVectorAndPrefReducer}
+   *
+   * @throws Exception
+   */
+  public void testToVectorAndPrefReducer() throws Exception {
+    Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable>.Context context =
+      EasyMock.createMock(Reducer.Context.class);
+
+    context.write(EasyMock.eq(new VarIntWritable(1)), vectorAndPrefsWritableMatches(Arrays.asList(123L, 456L),
+        Arrays.asList(1f, 2f), MathHelper.elem(3, 0.5d), MathHelper.elem(7, 0.8d)));
+
+    EasyMock.replay(context);
+
+    Vector similarityColumn = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    similarityColumn.set(3, 0.5d);
+    similarityColumn.set(7, 0.8d);
+
+    VectorOrPrefWritable itemPref1 = new VectorOrPrefWritable(123L, 1f);
+    VectorOrPrefWritable itemPref2 = new VectorOrPrefWritable(456L, 2f);
+    VectorOrPrefWritable similarities = new VectorOrPrefWritable(similarityColumn);
+
+    new ToVectorAndPrefReducer().reduce(new VarIntWritable(1), Arrays.asList(itemPref1, itemPref2, similarities),
+        context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * verifies a {@link VectorAndPrefsWritable}
+   *
+   * @param userIDs
+   * @param prefValues
+   * @param elements
+   * @return
+   */
+  public static VectorAndPrefsWritable vectorAndPrefsWritableMatches(final List<Long> userIDs,
+      final List<Float> prefValues, final Vector.Element... elements) {
+    EasyMock.reportMatcher(new IArgumentMatcher() {
+      @Override
+      public boolean matches(Object argument) {
+        if (argument instanceof VectorAndPrefsWritable) {
+          VectorAndPrefsWritable vectorAndPrefs = ((VectorAndPrefsWritable) argument);
+
+          if (!vectorAndPrefs.getUserIDs().equals(userIDs)) {
+            return false;
+          }
+          if (!vectorAndPrefs.getValues().equals(prefValues)) {
+            return false;
+          }
+          return MathHelper.consistsOf(vectorAndPrefs.getVector(), elements);
+        }
+        return false;
+      }
+
+      @Override
+      public void appendTo(StringBuffer buffer) {}
+    });
+    return null;
+  }
+
+  /**
+   * tests {@link ToVectorAndPrefReducer} in the error case that two similarity column vectors a supplied for the same
+   * item (which should never happen)
+   *
+   * @throws Exception
+   */
+  public void testToVectorAndPrefReducerExceptionOn2Vectors() throws Exception {
+    Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable>.Context context =
+      EasyMock.createMock(Reducer.Context.class);
+
+    EasyMock.replay(context);
+
+    Vector similarityColumn1 = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    Vector similarityColumn2 = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+
+    VectorOrPrefWritable similarities1 = new VectorOrPrefWritable(similarityColumn1);
+    VectorOrPrefWritable similarities2 = new VectorOrPrefWritable(similarityColumn2);
+
+    try {
+      new ToVectorAndPrefReducer().reduce(new VarIntWritable(1), Arrays.asList(similarities1, similarities2), context);
+      fail();
+    } catch (IllegalStateException e) {}
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * tests {@link PartialMultiplyMapper}
+   *
+   * @throws Exception
+   */
+  public void testPartialMultiplyMapper() throws Exception {
+
+    Vector similarityColumn = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    similarityColumn.set(3, 0.5d);
+    similarityColumn.set(7, 0.8d);
+
+    Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,PrefAndSimilarityColumnWritable>.Context context =
+      EasyMock.createMock(Mapper.Context.class);
+
+    PrefAndSimilarityColumnWritable one = new PrefAndSimilarityColumnWritable();
+    PrefAndSimilarityColumnWritable two = new PrefAndSimilarityColumnWritable();
+    one.set(1f, similarityColumn);
+    two.set(3f, similarityColumn);
+
+    context.write(EasyMock.eq(new VarLongWritable(123L)), EasyMock.eq(one));
+    context.write(EasyMock.eq(new VarLongWritable(456L)), EasyMock.eq(two));
+
+    EasyMock.replay(context);
+
+    VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityColumn, Arrays.asList(123L, 456L),
+        Arrays.asList(1f, 3f));
+
+    new PartialMultiplyMapper().map(new VarIntWritable(1), vectorAndPrefs, context);
+
+    EasyMock.verify(context);
+  }
+
+
+  /**
+   * tests {@link AggregateAndRecommendReducer}
+   *
+   * @throws Exception
+   */
+  public void testAggregateAndRecommendReducer() throws Exception {
+    Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable>.Context context =
+        EasyMock.createMock(Reducer.Context.class);
+
+    context.write(EasyMock.eq(new VarLongWritable(123L)), recommendationsMatch(new GenericRecommendedItem(1L, 2.8f),
+        new GenericRecommendedItem(2L, 2f)));
+
+    EasyMock.replay(context);
+
+    RandomAccessSparseVector similarityColumnOne = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    similarityColumnOne.set(1, 0.1d);
+    similarityColumnOne.set(2, 0.5d);
+
+    RandomAccessSparseVector similarityColumnTwo = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    similarityColumnTwo.set(1, 0.9d);
+    similarityColumnTwo.set(2, 0.5d);
+
+    List<PrefAndSimilarityColumnWritable> values = Arrays.asList(
+        new PrefAndSimilarityColumnWritable(1f, similarityColumnOne),
+        new PrefAndSimilarityColumnWritable(3f, similarityColumnTwo));
+
+    OpenIntLongHashMap indexItemIDMap = new OpenIntLongHashMap();
+    indexItemIDMap.put(1, 1L);
+    indexItemIDMap.put(2, 2L);
+
+    AggregateAndRecommendReducer reducer = new AggregateAndRecommendReducer();
+
+    setField(reducer, "indexItemIDMap", indexItemIDMap);
+    setField(reducer, "recommendationsPerUser", 3);
+
+    reducer.reduce(new VarLongWritable(123L), values, context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * tests {@link AggregateAndRecommendReducer}
+   *
+   * @throws Exception
+   */
+  public void testAggregateAndRecommendReducerExcludeRecommendationsBasedOnOneItem() throws Exception {
+    Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable>.Context context =
+        EasyMock.createMock(Reducer.Context.class);
+
+    context.write(EasyMock.eq(new VarLongWritable(123L)), recommendationsMatch(new GenericRecommendedItem(1L, 2.8f)));
+
+    EasyMock.replay(context);
+
+    RandomAccessSparseVector similarityColumnOne = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    similarityColumnOne.set(1, 0.1d);
+
+    RandomAccessSparseVector similarityColumnTwo = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    similarityColumnTwo.set(1, 0.9d);
+    similarityColumnTwo.set(2, 0.5d);
+
+    List<PrefAndSimilarityColumnWritable> values = Arrays.asList(
+        new PrefAndSimilarityColumnWritable(1f, similarityColumnOne),
+        new PrefAndSimilarityColumnWritable(3f, similarityColumnTwo));
+
+    OpenIntLongHashMap indexItemIDMap = new OpenIntLongHashMap();
+    indexItemIDMap.put(1, 1L);
+    indexItemIDMap.put(2, 2L);
+
+    AggregateAndRecommendReducer reducer = new AggregateAndRecommendReducer();
+
+    setField(reducer, "indexItemIDMap", indexItemIDMap);
+    setField(reducer, "recommendationsPerUser", 3);
+
+    reducer.reduce(new VarLongWritable(123L), values, context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * tests {@link AggregateAndRecommendReducer} with a limit on the recommendations per user
+   *
+   * @throws Exception
+   */
+  public void testAggregateAndRecommendReducerLimitNumberOfRecommendations() throws Exception {
+    Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable>.Context context =
+      EasyMock.createMock(Reducer.Context.class);
+
+    context.write(EasyMock.eq(new VarLongWritable(123L)), recommendationsMatch(new GenericRecommendedItem(1L, 2.8f)));
+
+    EasyMock.replay(context);
+
+    RandomAccessSparseVector similarityColumnOne = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    similarityColumnOne.set(1, 0.1d);
+    similarityColumnOne.set(2, 0.5d);
+
+    RandomAccessSparseVector similarityColumnTwo = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    similarityColumnTwo.set(1, 0.9d);
+    similarityColumnTwo.set(2, 0.5d);
+
+    List<PrefAndSimilarityColumnWritable> values = Arrays.asList(
+        new PrefAndSimilarityColumnWritable(1f, similarityColumnOne),
+        new PrefAndSimilarityColumnWritable(3f, similarityColumnTwo));
+
+    OpenIntLongHashMap indexItemIDMap = new OpenIntLongHashMap();
+    indexItemIDMap.put(1, 1L);
+    indexItemIDMap.put(2, 2L);
+
+    AggregateAndRecommendReducer reducer = new AggregateAndRecommendReducer();
+
+    setField(reducer, "indexItemIDMap", indexItemIDMap);
+    setField(reducer, "recommendationsPerUser", 1);
+
+    reducer.reduce(new VarLongWritable(123L), values, context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * verifies a {@link RecommendedItemsWritable}
+   *
+   * @param items
+   * @return
+   */
+  static RecommendedItemsWritable recommendationsMatch(final RecommendedItem... items) {
+    EasyMock.reportMatcher(new IArgumentMatcher() {
+      @Override
+      public boolean matches(Object argument) {
+        if (argument instanceof RecommendedItemsWritable) {
+          RecommendedItemsWritable recommendedItemsWritable = ((RecommendedItemsWritable) argument);
+          List<RecommendedItem> expectedItems = new LinkedList<RecommendedItem>();
+          for (RecommendedItem item : items) {
+            expectedItems.add(item);
+          }
+          return expectedItems.equals(recommendedItemsWritable.getRecommendedItems());
+        }
+        return false;
+      }
+
+      @Override
+      public void appendTo(StringBuffer buffer) {}
+    });
+    return null;
+  }
+
+  /**
+   * small integration test that runs the full job
+   *
+   * As a tribute to http://www.slideshare.net/srowen/collaborative-filtering-at-scale,
+   * we recommend people food to animals in this test :)
+   *
+   * <pre>
+   *
+   *  user-item-matrix
+   *
+   *          burger  hotdog  berries  icecream
+   *  dog       5       5        2        -
+   *  rabbit    2       -        3        5
+   *  cow       -       5        -        3
+   *  donkey    3       -        -        5
+   *
+   *
+   *  item-item-similarity-matrix (tanimoto-coefficient of the item-vectors of the user-item-matrix)
+   *
+   *          burger  hotdog  berries icecream
+   *  burger    -      0.25    0.66    0.5
+   *  hotdog   0.25     -      0.33    0.25
+   *  berries  0.66    0.33     -      0.25
+   *  icecream 0.5     0.25    0.25     -
+   *
+   *
+   *  Prediction(dog, icecream)   = (0.5 * 5 + 0.25 * 5 + 0.25 * 2 ) / (0.5 + 0.25 + 0.25)  ~ 4.3
+   *  Prediction(rabbit, hotdog)  = (0.25 * 2 + 0.33 * 3 + 0.25 * 5) / (0.25 + 0.33 + 0.25) ~ 3,3
+   *  Prediction(cow, burger)     = (0.25 * 5 + 0.5 * 3) / (0.25 + 0.5)                     ~ 3,7
+   *  Prediction(cow, berries)    = (0.33 * 5 + 0.25 * 3) / (0.33 + 0.25)                   ~ 4,1
+   *  Prediction(donkey, hotdog)  = (0.25 * 3 + 0.25 * 5) / (0.25 + 0.25)                   ~ 4
+   *  Prediction(donkey, berries) = (0.66 * 3 + 0.25 * 5) / (0.66 + 0.25)                   ~ 3,6
+   *
+   * </pre>
+   *
+   *
+   * @throws Exception
+   */
+  public void testCompleteJob() throws Exception {
+
+    File inputFile = getTestTempFile("prefs.txt");
+    File outputDir = getTestTempDir("output");
+    outputDir.delete();
+    File tmpDir = getTestTempDir("tmp");
+
+    writeLines(inputFile,
+        "1,1,5",
+        "1,2,5",
+        "1,3,2",
+        "2,1,2",
+        "2,3,3",
+        "2,4,5",
+        "3,2,5",
+        "3,4,3",
+        "4,1,3",
+        "4,4,5");
+
+    RecommenderJob recommenderJob = new RecommenderJob();
+
+    Configuration conf = new Configuration();
+    conf.set("mapred.input.dir", inputFile.getAbsolutePath());
+    conf.set("mapred.output.dir", outputDir.getAbsolutePath());
+    conf.setBoolean("mapred.output.compress", false);
+
+    recommenderJob.setConf(conf);
+
+    recommenderJob.run(new String[] { "--tempDir", tmpDir.getAbsolutePath(), "--similarityClassname",
+       DistributedTanimotoCoefficientVectorSimilarity.class.getName(), "--numRecommendations", String.valueOf(1) });
+
+    Map<Long,List<RecommendedItem>> recommendations = readRecommendations(new File(outputDir, "part-r-00000"));
+
+    assertEquals(4, recommendations.size());
+
+    for (Entry<Long,List<RecommendedItem>> entry : recommendations.entrySet()) {
+      long userID = entry.getKey();
+      List<RecommendedItem> items = entry.getValue();
+      assertNotNull(items);
+      assertEquals(1, items.size());
+      RecommendedItem item = items.get(0);
+
+      if (userID == 1L) {
+        assertEquals(4L, item.getItemID());
+        assertEquals(4.3d, item.getValue(), 0.05d);
+      }
+      if (userID == 2L) {
+        assertEquals(2L, item.getItemID());
+        assertEquals(3.3d, item.getValue(), 0.05d);
+      }
+      if (userID == 3L) {
+        assertEquals(3L, item.getItemID());
+        assertEquals(4.1d, item.getValue(), 0.05d);
+      }
+      if (userID == 4L) {
+        assertEquals(2L, item.getItemID());
+        assertEquals(4d, item.getValue(), 0.05d);
+      }
+    }
+  }
+
+  static Map<Long,List<RecommendedItem>> readRecommendations(File file) throws IOException {
+    Map<Long,List<RecommendedItem>> recommendations = new HashMap<Long,List<RecommendedItem>>();
+    FileLineIterable lineIterable = new FileLineIterable(file);
+    for (String line : lineIterable) {
+
+      String[] keyValue = line.split("\t");
+      long userID = Long.parseLong(keyValue[0]);
+      String[] tokens = keyValue[1].replaceAll("\\[", "")
+          .replaceAll("\\]", "").split(",");
+
+      List<RecommendedItem> items = new LinkedList<RecommendedItem>();
+      for (String token : tokens) {
+        String[] itemTokens = token.split(":");
+        long itemID = Long.parseLong(itemTokens[0]);
+        float value = Float.parseFloat(itemTokens[1]);
+        items.add(new GenericRecommendedItem(itemID, value));
+      }
+      recommendations.put(userID, items);
+    }
+    return recommendations;
+  }
+
+}

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java Thu Jul  8 19:19:53 2010
@@ -81,10 +81,39 @@ public abstract class MahoutTestCase ext
     return tempFileOrDir;
   }
 
-  protected static void setField(Object target, String fieldname, Object value)
+  /**
+   * try to directly set a (possibly private) field on an Object 
+   * 
+   * @param target
+   * @param fieldname
+   * @param value
+   * @throws NoSuchFieldException
+   * @throws IllegalAccessException
+   */
+  protected void setField(Object target, String fieldname, Object value)
       throws NoSuchFieldException, IllegalAccessException {
-    Field field = target.getClass().getDeclaredField(fieldname);
+    Field field = findDeclaredField(target.getClass(), fieldname);
     field.setAccessible(true);
     field.set(target, value);
   }
+  
+  /**
+   * find a declared field in a class or one of it's super classes
+   * 
+   * @param inClass
+   * @param fieldname
+   * @return
+   * @throws NoSuchFieldException
+   */
+  private Field findDeclaredField(Class<?> inClass, String fieldname) throws NoSuchFieldException {
+      if (Object.class.equals(inClass)) {
+        throw new NoSuchFieldException();
+      }     
+      for (Field field : inClass.getDeclaredFields()) {
+        if (field.getName().equalsIgnoreCase(fieldname)) {
+          return field;
+        }
+      }
+      return findDeclaredField(inClass.getSuperclass(), fieldname);    
+    }
 }

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java Thu Jul  8 19:19:53 2010
@@ -136,13 +136,7 @@ public class MathHelper {
       public boolean matches(Object argument) {
         if (argument instanceof VectorWritable) {
           Vector v = ((VectorWritable) argument).get();
-          for (Element element : elements) {
-            boolean matches = Math.abs(element.get() - v.get(element.index())) <= EPSILON;
-            if (!matches) {
-              return false;
-            }
-          }
-          return true;
+          return consistsOf(v, elements);
         }
         return false;
       }
@@ -154,6 +148,44 @@ public class MathHelper {
   }
 
   /**
+   * checks whether the {@link Vector} is equivalent to the set of {@link Vector.Element}s 
+   * 
+   * @param vector
+   * @param elements
+   * @return
+   */
+  public static boolean consistsOf(Vector vector, Vector.Element... elements) {
+    if (elements.length != numberOfNoNZeroNonNaNElements(vector)) {
+      return false;
+    }
+    for (Vector.Element element : elements) {
+      boolean matches = Math.abs(element.get() - vector.get(element.index())) <= EPSILON;
+      if (!matches) {
+        return false;
+      }
+    }
+    return true;    
+  }
+  
+  /**
+   * returns the number of elements in the {@link Vector} that are neither 0 nor NaN
+   * 
+   * @param vector
+   * @return
+   */
+  public static int numberOfNoNZeroNonNaNElements(Vector vector) {
+    int elementsInVector = 0;
+    Iterator<Element> vectorIterator = vector.iterateNonZero();
+    while (vectorIterator.hasNext()) {
+      Element currentElement = vectorIterator.next();
+      if (!Double.isNaN(currentElement.get())) {
+        elementsInVector++;
+      }      
+    }
+    return elementsInVector;
+  }
+  
+  /**
    * read a {@link Matrix} from a SequenceFile<IntWritable,VectorWritable>
    * @param fs
    * @param conf

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java Thu Jul  8 19:19:53 2010
@@ -34,8 +34,8 @@ import org.apache.mahout.math.RandomAcce
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.MathHelper;
 import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
+import org.apache.mahout.math.hadoop.MathHelper;
 import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob.EntriesToVectorsReducer;
 import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob.SimilarityReducer;
 import org.apache.mahout.math.hadoop.similarity.vector.DistributedTanimotoCoefficientVectorSimilarity;
@@ -275,7 +275,7 @@ public class TestRowSimilarityJob extend
 
     Matrix similarityMatrix =
       MathHelper.readEntries(fs, conf, new Path(outputDir.getAbsolutePath(), "part-r-00000"), 3, 3);
-
+    
     assertNotNull(similarityMatrix);
     assertEquals(3, similarityMatrix.numCols());
     assertEquals(3, similarityMatrix.numRows());
@@ -367,7 +367,7 @@ public class TestRowSimilarityJob extend
 
     Matrix similarityMatrix =
         MathHelper.readEntries(fs, conf, new Path(outputDir.getAbsolutePath(), "part-r-00000"), 3, 3);
-
+    
     assertNotNull(similarityMatrix);
     assertEquals(3, similarityMatrix.numCols());
     assertEquals(3, similarityMatrix.numRows());