You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/07/05 14:01:26 UTC

svn commit: r960571 - in /mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item: AggregateAndRecommendReducer.java RecommenderJob.java

Author: srowen
Date: Mon Jul  5 12:01:26 2010
New Revision: 960571

URL: http://svn.apache.org/viewvc?rev=960571&view=rev
Log:
ISSUE-432 add option for --itemsFile from Hui

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=960571&r1=960570&r2=960571&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Mon Jul  5 12:01:26 2010
@@ -26,6 +26,7 @@ import java.util.PriorityQueue;
 import java.util.Queue;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,9 +34,11 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
 import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
 import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
 import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.common.FileLineIterable;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.VectorWritable;
@@ -48,7 +51,10 @@ public final class AggregateAndRecommend
   static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
   static final String NUM_RECOMMENDATIONS = "numRecommendations";
   static final int DEFAULT_NUM_RECOMMENDATIONS = 10;
-
+  static final String ITEMS_FILE="itemsFile";
+  
+  private FastIDSet itemsToRecommendFor;
+  
   private static final PathFilter PARTS_FILTER = new PathFilter() {
     @Override
     public boolean accept(Path path) {
@@ -81,6 +87,23 @@ public final class AggregateAndRecommend
     } catch (IOException ioe) {
       throw new IllegalStateException(ioe);
     }
+    
+    try {
+        FileSystem fs = FileSystem.get(jobConf);
+        String usersFilePathString = jobConf.get(ITEMS_FILE);
+        if (usersFilePathString == null) {
+        	itemsToRecommendFor = null;
+        } else {
+        	itemsToRecommendFor = new FastIDSet();
+          Path usersFilePath = new Path(usersFilePathString).makeQualified(fs);
+          FSDataInputStream in = fs.open(usersFilePath);
+          for (String line : new FileLineIterable(in)) {
+        	  itemsToRecommendFor.add(Long.parseLong(line));
+          }
+        }
+      } catch (IOException ioe) {
+        throw new IllegalStateException(ioe);
+      }
   }
 
   @Override
@@ -106,13 +129,17 @@ public final class AggregateAndRecommend
     while (recommendationVectorIterator.hasNext()) {
       Vector.Element element = recommendationVectorIterator.next();
       int index = element.index();
-      float value = (float) element.get();
-      if (!Float.isNaN(value)) {
-        if (topItems.size() < recommendationsPerUser) {
-          topItems.add(new GenericRecommendedItem(indexItemIDMap.get(index), value));
-        } else if (value > topItems.peek().getValue()) {
-          topItems.add(new GenericRecommendedItem(indexItemIDMap.get(index), value));
-          topItems.poll();
+  
+      long itemId = indexItemIDMap.get(index);
+      if (itemsToRecommendFor == null || itemsToRecommendFor.contains(itemId)) {
+        float value = (float) element.get();
+        if (!Float.isNaN(value)) {
+          if (topItems.size() < recommendationsPerUser) {
+            topItems.add(new GenericRecommendedItem(itemId, value));
+          } else if (value > topItems.peek().getValue()) {
+            topItems.add(new GenericRecommendedItem(itemId, value));
+            topItems.poll();
+          }
         }
       }
     }
@@ -125,4 +152,4 @@ public final class AggregateAndRecommend
     }
   }
 
-}
\ No newline at end of file
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=960571&r1=960570&r2=960571&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Mon Jul  5 12:01:26 2010
@@ -52,6 +52,7 @@ import org.apache.mahout.math.VectorWrit
  *  for which recommendations should be computed, one per line</li>
  * <li>-Dmapred.output.dir=(path): output path where recommender output should go</li>
  * <li>--usersFile (path): file containing user IDs to recommend for (optional)</li>
+ * <li>--itemsFile (path): file containing item IDs to recommend for (optional)</li>
  * <li>--numRecommendations (integer): Number of recommendations to compute per user (optional; default 10)</li>
  * <li>--booleanData (boolean): Treat input data as having to pref values (false)</li>
  * <li>--maxPrefsPerUserConsidered (integer): Maximum number of preferences considered per user in
@@ -77,6 +78,7 @@ public final class RecommenderJob extend
     addOption("numRecommendations", "n", "Number of recommendations per user",
       String.valueOf(AggregateAndRecommendReducer.DEFAULT_NUM_RECOMMENDATIONS));
     addOption("usersFile", "u", "File of users to recommend for", null);
+    addOption("itemsFile", "u", "File of items to recommend for", null);
     addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString());
     addOption("maxPrefsPerUserConsidered", null,
       "Maximum number of preferences considered per user in final recommendation phase",
@@ -95,6 +97,7 @@ public final class RecommenderJob extend
     Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
     int numRecommendations = Integer.parseInt(parsedArgs.get("--numRecommendations"));
     String usersFile = parsedArgs.get("--usersFile");
+    String itemsFile = parsedArgs.get("--itemsFile");
     boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
     int maxPrefsPerUserConsidered = Integer.parseInt(parsedArgs.get("--maxPrefsPerUserConsidered"));
     int maxCooccurrencesPerItemConsidered = Integer.parseInt(parsedArgs.get("--maxCooccurrencesPerItemConsidered"));
@@ -170,12 +173,16 @@ public final class RecommenderJob extend
     }
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+    
       Job aggregateAndRecommend = prepareJob(
           partialMultiplyPath, outputPath, SequenceFileInputFormat.class,
           PartialMultiplyMapper.class, VarLongWritable.class, VectorWritable.class,
           AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
           TextOutputFormat.class);
       Configuration jobConf = aggregateAndRecommend.getConfiguration();
+      if (itemsFile != null) {
+    	  jobConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
+    }
       setIOSort(aggregateAndRecommend);
       aggregateAndRecommend.setCombinerClass(AggregateCombiner.class);
       jobConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath.toString());