You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/07/05 14:01:26 UTC
svn commit: r960571 - in
/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item:
AggregateAndRecommendReducer.java RecommenderJob.java
Author: srowen
Date: Mon Jul 5 12:01:26 2010
New Revision: 960571
URL: http://svn.apache.org/viewvc?rev=960571&view=rev
Log:
ISSUE-432 add option for --itemsFile from Hui
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=960571&r1=960570&r2=960571&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Mon Jul 5 12:01:26 2010
@@ -26,6 +26,7 @@ import java.util.PriorityQueue;
import java.util.Queue;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -33,9 +34,11 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.common.FileLineIterable;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
@@ -48,7 +51,10 @@ public final class AggregateAndRecommend
static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
static final String NUM_RECOMMENDATIONS = "numRecommendations";
static final int DEFAULT_NUM_RECOMMENDATIONS = 10;
-
+ static final String ITEMS_FILE="itemsFile";
+
+ private FastIDSet itemsToRecommendFor;
+
private static final PathFilter PARTS_FILTER = new PathFilter() {
@Override
public boolean accept(Path path) {
@@ -81,6 +87,23 @@ public final class AggregateAndRecommend
} catch (IOException ioe) {
throw new IllegalStateException(ioe);
}
+
+ try {
+ FileSystem fs = FileSystem.get(jobConf);
+ String usersFilePathString = jobConf.get(ITEMS_FILE);
+ if (usersFilePathString == null) {
+ itemsToRecommendFor = null;
+ } else {
+ itemsToRecommendFor = new FastIDSet();
+ Path usersFilePath = new Path(usersFilePathString).makeQualified(fs);
+ FSDataInputStream in = fs.open(usersFilePath);
+ for (String line : new FileLineIterable(in)) {
+ itemsToRecommendFor.add(Long.parseLong(line));
+ }
+ }
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
}
@Override
@@ -106,13 +129,17 @@ public final class AggregateAndRecommend
while (recommendationVectorIterator.hasNext()) {
Vector.Element element = recommendationVectorIterator.next();
int index = element.index();
- float value = (float) element.get();
- if (!Float.isNaN(value)) {
- if (topItems.size() < recommendationsPerUser) {
- topItems.add(new GenericRecommendedItem(indexItemIDMap.get(index), value));
- } else if (value > topItems.peek().getValue()) {
- topItems.add(new GenericRecommendedItem(indexItemIDMap.get(index), value));
- topItems.poll();
+
+ long itemId = indexItemIDMap.get(index);
+ if (itemsToRecommendFor == null || itemsToRecommendFor.contains(itemId)) {
+ float value = (float) element.get();
+ if (!Float.isNaN(value)) {
+ if (topItems.size() < recommendationsPerUser) {
+ topItems.add(new GenericRecommendedItem(itemId, value));
+ } else if (value > topItems.peek().getValue()) {
+ topItems.add(new GenericRecommendedItem(itemId, value));
+ topItems.poll();
+ }
}
}
}
@@ -125,4 +152,4 @@ public final class AggregateAndRecommend
}
}
-}
\ No newline at end of file
+}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=960571&r1=960570&r2=960571&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Mon Jul 5 12:01:26 2010
@@ -52,6 +52,7 @@ import org.apache.mahout.math.VectorWrit
* for which recommendations should be computed, one per line</li>
* <li>-Dmapred.output.dir=(path): output path where recommender output should go</li>
* <li>--usersFile (path): file containing user IDs to recommend for (optional)</li>
+ * <li>--itemsFile (path): file containing item IDs to recommend for (optional)</li>
* <li>--numRecommendations (integer): Number of recommendations to compute per user (optional; default 10)</li>
* <li>--booleanData (boolean): Treat input data as having to pref values (false)</li>
* <li>--maxPrefsPerUserConsidered (integer): Maximum number of preferences considered per user in
@@ -77,6 +78,7 @@ public final class RecommenderJob extend
addOption("numRecommendations", "n", "Number of recommendations per user",
String.valueOf(AggregateAndRecommendReducer.DEFAULT_NUM_RECOMMENDATIONS));
addOption("usersFile", "u", "File of users to recommend for", null);
+ addOption("itemsFile", "u", "File of items to recommend for", null);
addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString());
addOption("maxPrefsPerUserConsidered", null,
"Maximum number of preferences considered per user in final recommendation phase",
@@ -95,6 +97,7 @@ public final class RecommenderJob extend
Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
int numRecommendations = Integer.parseInt(parsedArgs.get("--numRecommendations"));
String usersFile = parsedArgs.get("--usersFile");
+ String itemsFile = parsedArgs.get("--itemsFile");
boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
int maxPrefsPerUserConsidered = Integer.parseInt(parsedArgs.get("--maxPrefsPerUserConsidered"));
int maxCooccurrencesPerItemConsidered = Integer.parseInt(parsedArgs.get("--maxCooccurrencesPerItemConsidered"));
@@ -170,12 +173,16 @@ public final class RecommenderJob extend
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+
Job aggregateAndRecommend = prepareJob(
partialMultiplyPath, outputPath, SequenceFileInputFormat.class,
PartialMultiplyMapper.class, VarLongWritable.class, VectorWritable.class,
AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
TextOutputFormat.class);
Configuration jobConf = aggregateAndRecommend.getConfiguration();
+ if (itemsFile != null) {
+ jobConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
+ }
setIOSort(aggregateAndRecommend);
aggregateAndRecommend.setCombinerClass(AggregateCombiner.class);
jobConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath.toString());