You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2009/12/11 20:04:24 UTC

svn commit: r889767 - in /lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop: ./ item/

Author: srowen
Date: Fri Dec 11 19:04:23 2009
New Revision: 889767

URL: http://svn.apache.org/viewvc?rev=889767&view=rev
Log:
Switch Hadoop job to use column-based computation instead of row-based

Added:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MapFilesMap.java
Modified:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ItemPrefWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ItemPrefWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ItemPrefWritable.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ItemPrefWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ItemPrefWritable.java Fri Dec 11 19:04:23 2009
@@ -23,7 +23,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-/** A {@link Writable} encapsulating an item and a preference value. */
+/** A {@link Writable} encapsulating an item ID and a preference value. */
 public final class ItemPrefWritable implements Writable {
 
   private long itemID;

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MapFilesMap.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MapFilesMap.java?rev=889767&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MapFilesMap.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MapFilesMap.java Fri Dec 11 19:04:23 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents a series of {@link MapFile}s, from which one might want to look up values
+ * based on keys. It just provides a simplified way to open them all up, and search
+ * from all of them.
+ */
+public final class MapFilesMap<K extends WritableComparable, V extends Writable> implements Closeable {
+
+  private static final PathFilter PARTS_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith("part-");
+    }
+  };
+
+  private final List<MapFile.Reader> readers;
+
+  public MapFilesMap(FileSystem fs,
+                     Path parentDir,
+                     Configuration conf) throws IOException {
+    readers = new ArrayList<MapFile.Reader>();
+    try {
+      for (FileStatus status : fs.listStatus(parentDir, PARTS_FILTER)) {
+        readers.add(new MapFile.Reader(fs, status.getPath().toString(), conf));
+      }
+    } catch (IOException ioe) {
+      close();
+      throw ioe;
+    }
+    if (readers.isEmpty()) {
+      throw new IllegalArgumentException("No MapFiles found in " + parentDir);
+    }
+  }
+
+  /**
+   * @return value reference if key is found, filled in with value data, or null if not found
+   */
+  public V get(K key, V value) throws IOException {
+    for (MapFile.Reader reader : readers) {
+      if (reader.get(key, value) != null) {
+        return value;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void close() {
+    for (MapFile.Reader reader : readers) {
+      try {
+        reader.close();
+      } catch (IOException ioe) {
+        // continue
+      }
+    }
+  }
+}

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java Fri Dec 11 19:04:23 2009
@@ -97,7 +97,9 @@
       result.append(':');
       String valueString = String.valueOf(item.getValue());
       // Is this rounding too crude?
-      if (valueString.length() > 6) {
+      if (valueString.indexOf('E') >= 0) {
+        valueString = "0.0";
+      } else if (valueString.length() > 6) {
         valueString = valueString.substring(0, 6);
       }
       result.append(valueString);

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java Fri Dec 11 19:04:23 2009
@@ -27,6 +27,19 @@
 import java.io.IOException;
 import java.util.regex.Pattern;
 
+/**
+ * <h1>Input</h1>
+ *
+ * <p>Intended for use with {@link org.apache.hadoop.mapred.TextInputFormat}; accepts
+ * line number / line pairs as {@link LongWritable}/{@link Text} pairs.</p>
+ *
+ * <p>Each line is assumed to be of the form <code>userID,itemID,preference</code>.</p>
+ *
+ * <h1>Output</h1>
+ *
+ * <p>Outputs the user ID as a {@link LongWritable} mapped to the item ID and preference
+ * as a {@link ItemPrefWritable}.</p>
+ */
 public final class ToItemPrefsMapper
     extends MapReduceBase
     implements Mapper<LongWritable, Text, LongWritable, ItemPrefWritable> {

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java Fri Dec 11 19:04:23 2009
@@ -41,11 +41,11 @@
                   Reporter reporter) throws IOException {
     String[] tokens = COMMA.split(value.toString());
     long itemID = Long.parseLong(tokens[1]);
-    int index = itemIDToIndex(itemID);
+    int index = idToIndex(itemID);
     output.collect(new IntWritable(index), new LongWritable(itemID));
   }
 
-  static int itemIDToIndex(long itemID) {
+  static int idToIndex(long itemID) {
     return (int) (itemID) ^ (int) (itemID >>> 32);
   }
 

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Fri Dec 11 19:04:23 2009
@@ -24,6 +24,7 @@
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
@@ -83,7 +84,7 @@
                                              ItemIDIndexReducer.class,
                                              IntWritable.class,
                                              LongWritable.class,
-                                             SequenceFileOutputFormat.class);
+                                             MapFileOutputFormat.class);
     JobClient.runJob(itemIDIndexConf);
 
     JobConf toUserVectorConf = prepareJobConf(inputPath,
@@ -109,7 +110,7 @@
                                                 UserVectorToCooccurrenceReducer.class,
                                                 IntWritable.class,
                                                 SparseVector.class,
-                                                SequenceFileOutputFormat.class);
+                                                MapFileOutputFormat.class);
     JobClient.runJob(toCooccurrenceConf);
 
     JobConf recommenderConf = prepareJobConf(userVectorPath,

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java Fri Dec 11 19:04:23 2009
@@ -18,20 +18,17 @@
 package org.apache.mahout.cf.taste.hadoop.item;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.MapFilesMap;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
-import org.apache.mahout.cf.taste.impl.common.FastByIDMap;
 import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
 import org.apache.mahout.cf.taste.recommender.RecommendedItem;
 import org.apache.mahout.matrix.SparseVector;
@@ -53,40 +50,19 @@
   static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
   static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
 
-  private static final PathFilter IGNORABLE_FILES_FILTER = new PathFilter() {
-    @Override
-    public boolean accept(Path path) {
-      return !path.getName().startsWith("_logs");
-    }
-  };
-
-  private FileSystem fs;
-  private Path cooccurrencePath;
   private int recommendationsPerUser;
-  private FastByIDMap<Long> indexItemIDMap;
+  private MapFilesMap<IntWritable,LongWritable> indexItemIDMap;
+  private MapFilesMap<IntWritable,Vector> cooccurrenceColumnMap;
 
   @Override
   public void configure(JobConf jobConf) {
     try {
-      fs = FileSystem.get(jobConf);
-    } catch (IOException ioe) {
-      throw new IllegalStateException(ioe);
-    }
-    cooccurrencePath = new Path(jobConf.get(COOCCURRENCE_PATH)).makeQualified(fs);
-    Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
-    recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
-    indexItemIDMap = new FastByIDMap<Long>();
-    try {
-      IntWritable index = new IntWritable();
-      LongWritable itemID = new LongWritable();
-      Configuration conf = new Configuration();
-      for (FileStatus status : fs.listStatus(itemIDIndexPath, IGNORABLE_FILES_FILTER)) {
-        SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
-        while (reader.next(index, itemID)) {
-          indexItemIDMap.put(index.get(), itemID.get());
-        }
-        reader.close();
-      }
+      FileSystem fs = FileSystem.get(jobConf);
+      Path cooccurrencePath = new Path(jobConf.get(COOCCURRENCE_PATH)).makeQualified(fs);
+      Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
+      recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
+      indexItemIDMap = new MapFilesMap<IntWritable,LongWritable>(fs, itemIDIndexPath, new Configuration());
+      cooccurrenceColumnMap = new MapFilesMap<IntWritable,Vector>(fs, cooccurrencePath, new Configuration());
     } catch (IOException ioe) {
       throw new IllegalStateException(ioe);
     }
@@ -97,45 +73,45 @@
                   SparseVector userVector,
                   OutputCollector<LongWritable, RecommendedItemsWritable> output,
                   Reporter reporter) throws IOException {
-    IntWritable indexWritable = new IntWritable();
-    Vector cooccurrenceVector = new SparseVector(Integer.MAX_VALUE, 1000);
-    Configuration conf = new Configuration();
+
+    Iterator<Vector.Element> userVectorIterator = userVector.iterateNonZero();
+    Vector recommendationVector = new SparseVector(Integer.MAX_VALUE, 1000);
+    Vector columnVector = new SparseVector(Integer.MAX_VALUE, 1000);
+    while (userVectorIterator.hasNext()) {
+      Vector.Element element = userVectorIterator.next();
+      int index = element.index();
+      double value = element.get();
+      cooccurrenceColumnMap.get(new IntWritable(index), columnVector);
+      columnVector.times(value).addTo(recommendationVector);
+    }
+
     Queue<RecommendedItem> topItems =
-        new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1, Collections.reverseOrder());
-    for (FileStatus status : fs.listStatus(cooccurrencePath, IGNORABLE_FILES_FILTER)) {
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
-      while (reader.next(indexWritable, cooccurrenceVector)) {
-        Long itemID = indexItemIDMap.get(indexWritable.get());
-        if (itemID != null) {
-          processOneRecommendation(userVector, itemID, cooccurrenceVector, topItems);
-        } else {
-          throw new IllegalStateException("Found index without item ID: " + indexWritable.get());
-        }
+      new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1, Collections.reverseOrder());
+
+    Iterator<Vector.Element> recommendationVectorIterator = recommendationVector.iterateNonZero();
+    LongWritable itemID = new LongWritable();
+    while (recommendationVectorIterator.hasNext()) {
+      Vector.Element element = recommendationVectorIterator.next();
+      if (topItems.size() < recommendationsPerUser) {
+        indexItemIDMap.get(new IntWritable(element.index()), itemID);
+        topItems.add(new GenericRecommendedItem(itemID.get(), (float) element.get()));
+      } else if (element.get() > topItems.peek().getValue()) {
+        indexItemIDMap.get(new IntWritable(element.index()), itemID);
+        topItems.add(new GenericRecommendedItem(itemID.get(), (float) element.get()));
+        topItems.poll();
       }
-      reader.close();
     }
+
     List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>(topItems.size());
     recommendations.addAll(topItems);
+    Collections.sort(recommendations);
     output.collect(userID, new RecommendedItemsWritable(recommendations));
   }
 
-  private void processOneRecommendation(Vector userVector,
-                                        long itemID,
-                                        Vector cooccurrenceVector,
-                                        Queue<RecommendedItem> topItems) {
-    double totalWeight = 0.0;
-    Iterator<Vector.Element> cooccurrences = cooccurrenceVector.iterateNonZero();
-    while (cooccurrences.hasNext()) {
-      Vector.Element cooccurrence = cooccurrences.next();
-      totalWeight += cooccurrence.get();
-    }
-    double score = userVector.dot(cooccurrenceVector) / totalWeight;
-    if (!Double.isNaN(score)) {
-      topItems.add(new GenericRecommendedItem(itemID, (float) score));
-      if (topItems.size() > recommendationsPerUser) {
-        topItems.poll();
-      }
-    }
+  @Override
+  public void close() {
+    indexItemIDMap.close();
+    cooccurrenceColumnMap.close();
   }
 
 }
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java Fri Dec 11 19:04:23 2009
@@ -24,14 +24,37 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.cf.taste.hadoop.ItemPrefWritable;
 import org.apache.mahout.matrix.SparseVector;
+import org.apache.mahout.matrix.Vector;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Queue;
 
+/**
+ * <h1>Input</h1>
+ *
+ * <p>Takes user IDs as {@link LongWritable} mapped to all associated item IDs
+ * and preference values, as {@link ItemPrefWritable}s.</p>
+ *
+ * <h1>Output</h1>
+ *
+ * <p>The same user ID mapped to a {@link SparseVector} representation of the
+ * same item IDs and preference values. Item IDs are used as vector indexes;
+ * they are hashed into ints to work as indexes with
+ * {@link ItemIDIndexMapper#idToIndex(long)}. The mapping is remembered for
+ * later with a combination of {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.</p>
+ *
+ * <p>The number of non-default elements actually retained in the user vector is capped
+ * at {@link #MAX_PREFS_CONSIDERED}.</p>
+ *
+ */
 public final class ToUserVectorReducer
     extends MapReduceBase
     implements Reducer<LongWritable, ItemPrefWritable, LongWritable, SparseVector> {
 
+  public static final int MAX_PREFS_CONSIDERED = 50;
+
   @Override
   public void reduce(LongWritable userID,
                      Iterator<ItemPrefWritable> itemPrefs,
@@ -41,11 +64,40 @@
       SparseVector userVector = new SparseVector(Integer.MAX_VALUE, 100);
       while (itemPrefs.hasNext()) {
         ItemPrefWritable itemPref = itemPrefs.next();
-        int index = ItemIDIndexMapper.itemIDToIndex(itemPref.getItemID());
+        int index = ItemIDIndexMapper.idToIndex(itemPref.getItemID());
         userVector.set(index, itemPref.getPrefValue());
       }
+
+      if (userVector.getNumNondefaultElements() > MAX_PREFS_CONSIDERED) {
+        double cutoff = findTopNPrefsCutoff(MAX_PREFS_CONSIDERED, userVector);
+        SparseVector filteredVector = new SparseVector(Integer.MAX_VALUE, MAX_PREFS_CONSIDERED);
+        Iterator<Vector.Element> it = userVector.iterateNonZero();
+        while (it.hasNext()) {
+          Vector.Element element = it.next();
+          if (element.get() >= cutoff) {
+            filteredVector.set(element.index(), element.get());
+          }
+        }
+        userVector = filteredVector;
+      }
+
       output.collect(userID, userVector);
     }
   }
 
+  private static double findTopNPrefsCutoff(int n, Vector userVector) {
+    Queue<Double> topPrefValues = new PriorityQueue<Double>(n + 1);
+    Iterator<Vector.Element> it = userVector.iterateNonZero();
+    while (it.hasNext()) {
+      double prefValue = it.next().get();
+      if (topPrefValues.size() < n) {
+        topPrefValues.add(prefValue);
+      } else if (prefValue > topPrefValues.peek()) {
+        topPrefValues.add(prefValue);
+        topPrefValues.poll();
+      }
+    }
+    return topPrefValues.peek();
+  }
+
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java Fri Dec 11 19:04:23 2009
@@ -28,63 +28,30 @@
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.PriorityQueue;
-import java.util.Queue;
 
 public final class UserVectorToCooccurrenceMapper
     extends MapReduceBase
     implements Mapper<LongWritable, SparseVector, IntWritable, IntWritable> {
 
-  private static final int MAX_PREFS_CONSIDERED = 50;
-
   @Override
   public void map(LongWritable userID,
                   SparseVector userVector,
                   OutputCollector<IntWritable, IntWritable> output,
                   Reporter reporter) throws IOException {
-
-    double cutoff = userVector.size() <= MAX_PREFS_CONSIDERED ?
-        Double.NEGATIVE_INFINITY : findTopNPrefsCutoff(MAX_PREFS_CONSIDERED, userVector);
-
     Iterator<Vector.Element> it = userVector.iterateNonZero();
-
     while (it.hasNext()) {
       Vector.Element next1 = it.next();
-      if (next1.get() >= cutoff) {
-
-        int index1 = next1.index();
-        Iterator<Vector.Element> it2 = userVector.iterateNonZero();
-        IntWritable itemWritable1 = new IntWritable(index1);
-
-        while (it2.hasNext()) {
-          Vector.Element next2 = it2.next();
-          if (next2.get() >= cutoff) {
-
-            int index2 = next2.index();
-            if (index1 != index2) {
-              output.collect(itemWritable1, new IntWritable(index2));
-            }
-
-          }
+      int index1 = next1.index();
+      Iterator<Vector.Element> it2 = userVector.iterateNonZero();
+      IntWritable itemWritable1 = new IntWritable(index1);
+      while (it2.hasNext()) {
+        Vector.Element next2 = it2.next();
+        int index2 = next2.index();
+        if (index1 != index2) {
+          output.collect(itemWritable1, new IntWritable(index2));
         }
-
-      }
-    }
-  }
-
-  private static double findTopNPrefsCutoff(int n, Vector userVector) {
-    Queue<Double> topPrefValues = new PriorityQueue<Double>(n + 1);
-    Iterator<Vector.Element> it = userVector.iterateNonZero();
-    while (it.hasNext()) {
-      double prefValue = it.next().get();
-      if (topPrefValues.size() < n) {
-        topPrefValues.add(prefValue);
-      } else if (prefValue > topPrefValues.peek()) {
-        topPrefValues.add(prefValue);
-        topPrefValues.poll();
       }
     }
-    return topPrefValues.peek();
   }
 
 }