You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/06/08 09:28:25 UTC
svn commit: r1490930 - in /mahout/trunk: ./
core/src/main/java/org/apache/mahout/cf/taste/hadoop/
core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/
core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/
core/src/main/java/org/apache/mahout/...
Author: ssc
Date: Sat Jun 8 07:28:24 2013
New Revision: 1490930
URL: http://svn.apache.org/r1490930
Log:
MAHOUT-974 org.apache.mahout.cf.taste.hadoop.als.ParallelALSFactorizationJob use integer as userId and itemId
Modified:
mahout/trunk/CHANGELOG
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
Modified: mahout/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Sat Jun 8 07:28:24 2013
@@ -2,6 +2,8 @@ Mahout Change Log
Release 0.8 - unreleased
+ MAHOUT-974: org.apache.mahout.cf.taste.hadoop.als.ParallelALSFactorizationJob use integer as userId and itemId (ssc)
+
MAHOUT-1052: Add an option to MinHashDriver that specifies the dimension of vector to hash (indexes or values) (Elena Smirnova via smarthi)
__MAHOUT-1237: Total cluster cost isn't computed properly (dfilimon)
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java Sat Jun 8 07:28:24 2013
@@ -45,6 +45,10 @@ public class MutableRecommendedItem impl
return value;
}
+ public void setItemID(long itemID) {
+ this.itemID = itemID;
+ }
+
public void set(long itemID, float value) {
this.itemID = itemID;
this.value = value;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java Sat Jun 8 07:28:24 2013
@@ -35,11 +35,13 @@ import java.util.regex.Pattern;
*/
public final class TasteHadoopUtils {
+ public static final int USER_ID_POS = 0;
+ public static final int ITEM_ID_POS = 1;
+
/** Standard delimiter of textual preference data */
private static final Pattern PREFERENCE_TOKEN_DELIMITER = Pattern.compile("[\t,]");
- private TasteHadoopUtils() {
- }
+ private TasteHadoopUtils() {}
/**
* Splits a preference data line into string tokens
@@ -55,12 +57,18 @@ public final class TasteHadoopUtils {
return 0x7FFFFFFF & Longs.hashCode(id);
}
+ public static int readID(String token, boolean usesLongIDs) {
+ return usesLongIDs ?
+ TasteHadoopUtils.idToIndex(Long.parseLong(token))
+ : Integer.parseInt(token);
+ }
+
/**
* Reads a binary mapping file
*/
- public static OpenIntLongHashMap readItemIDIndexMap(String itemIDIndexPathStr, Configuration conf) {
- OpenIntLongHashMap indexItemIDMap = new OpenIntLongHashMap();
- Path itemIDIndexPath = new Path(itemIDIndexPathStr);
+ public static OpenIntLongHashMap readIDIndexMap(String idIndexPathStr, Configuration conf) {
+ OpenIntLongHashMap indexIDMap = new OpenIntLongHashMap();
+ Path itemIDIndexPath = new Path(idIndexPathStr);
for (Pair<VarIntWritable,VarLongWritable> record
: new SequenceFileDirIterable<VarIntWritable,VarLongWritable>(itemIDIndexPath,
PathType.LIST,
@@ -68,9 +76,9 @@ public final class TasteHadoopUtils {
null,
true,
conf)) {
- indexItemIDMap.put(record.getFirst().get(), record.getSecond().get());
+ indexIDMap.put(record.getFirst().get(), record.getSecond().get());
}
- return indexItemIDMap;
+ return indexIDMap;
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java Sat Jun 8 07:28:24 2013
@@ -19,6 +19,7 @@ package org.apache.mahout.cf.taste.hadoo
import com.google.common.base.Charsets;
import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -75,6 +76,7 @@ public class FactorizationEvaluator exte
addInputOption();
addOption("userFeatures", null, "path to the user feature matrix", true);
addOption("itemFeatures", null, "path to the item feature matrix", true);
+ addOption("usesLongIDs", null, "input contains long IDs that need to be translated");
addOutputOption();
Map<String,List<String>> parsedArgs = parseArguments(args);
@@ -87,8 +89,16 @@ public class FactorizationEvaluator exte
Job predictRatings = prepareJob(getInputPath(), errors, TextInputFormat.class, PredictRatingsMapper.class,
DoubleWritable.class, NullWritable.class, SequenceFileOutputFormat.class);
- predictRatings.getConfiguration().set(USER_FEATURES_PATH, getOption("userFeatures"));
- predictRatings.getConfiguration().set(ITEM_FEATURES_PATH, getOption("itemFeatures"));
+ Configuration conf = predictRatings.getConfiguration();
+ conf.set(USER_FEATURES_PATH, getOption("userFeatures"));
+ conf.set(ITEM_FEATURES_PATH, getOption("itemFeatures"));
+
+ boolean usesLongIDs = Boolean.parseBoolean(getOption("usesLongIDs"));
+ if (usesLongIDs) {
+ conf.set(ParallelALSFactorizationJob.USES_LONG_IDS, String.valueOf(true));
+ }
+
+
boolean succeeded = predictRatings.waitForCompletion(true);
if (!succeeded) {
return -1;
@@ -125,27 +135,36 @@ public class FactorizationEvaluator exte
private OpenIntObjectHashMap<Vector> U;
private OpenIntObjectHashMap<Vector> M;
+ private boolean usesLongIDs;
+
+ private final DoubleWritable error = new DoubleWritable();
+
@Override
protected void setup(Context ctx) throws IOException, InterruptedException {
- Path pathToU = new Path(ctx.getConfiguration().get(USER_FEATURES_PATH));
- Path pathToM = new Path(ctx.getConfiguration().get(ITEM_FEATURES_PATH));
+ Configuration conf = ctx.getConfiguration();
+
+ Path pathToU = new Path(conf.get(USER_FEATURES_PATH));
+ Path pathToM = new Path(conf.get(ITEM_FEATURES_PATH));
- U = ALS.readMatrixByRows(pathToU, ctx.getConfiguration());
- M = ALS.readMatrixByRows(pathToM, ctx.getConfiguration());
+ U = ALS.readMatrixByRows(pathToU, conf);
+ M = ALS.readMatrixByRows(pathToM, conf);
+
+ usesLongIDs = conf.getBoolean(ParallelALSFactorizationJob.USES_LONG_IDS, false);
}
@Override
protected void map(LongWritable key, Text value, Context ctx) throws IOException, InterruptedException {
String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
- int userID = Integer.parseInt(tokens[0]);
- int itemID = Integer.parseInt(tokens[1]);
+
+ int userID = TasteHadoopUtils.readID(tokens[TasteHadoopUtils.USER_ID_POS], usesLongIDs);
+ int itemID = TasteHadoopUtils.readID(tokens[TasteHadoopUtils.ITEM_ID_POS], usesLongIDs);
double rating = Double.parseDouble(tokens[2]);
if (U.containsKey(userID) && M.containsKey(itemID)) {
double estimate = U.get(userID).dot(M.get(itemID));
- double err = rating - estimate;
- ctx.write(new DoubleWritable(err), NullWritable.get());
+ error.set(rating - estimate);
+ ctx.write(error, NullWritable.get());
}
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java Sat Jun 8 07:28:24 2013
@@ -17,6 +17,7 @@
package org.apache.mahout.cf.taste.hadoop.als;
+import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
@@ -48,6 +49,8 @@ import org.apache.mahout.common.mapreduc
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
@@ -88,12 +91,16 @@ public class ParallelALSFactorizationJob
static final String ALPHA = ParallelALSFactorizationJob.class.getName() + ".alpha";
static final String NUM_ENTITIES = ParallelALSFactorizationJob.class.getName() + ".numEntities";
+ static final String USES_LONG_IDS = ParallelALSFactorizationJob.class.getName() + ".usesLongIDs";
+ static final String TOKEN_POS = ParallelALSFactorizationJob.class.getName() + ".tokenPos";
+
private boolean implicitFeedback;
private int numIterations;
private int numFeatures;
private double lambda;
private double alpha;
private int numThreadsPerSolver;
+ private boolean usesLongIDs;
private int numItems;
private int numUsers;
@@ -115,6 +122,7 @@ public class ParallelALSFactorizationJob
addOption("numFeatures", null, "dimension of the feature space", true);
addOption("numIterations", null, "number of iterations", true);
addOption("numThreadsPerSolver", null, "threads per solver mapper", String.valueOf(1));
+ addOption("usesLongIDs", null, "input contains long IDs that need to be translated");
Map<String,List<String>> parsedArgs = parseArguments(args);
if (parsedArgs == null) {
@@ -128,6 +136,7 @@ public class ParallelALSFactorizationJob
implicitFeedback = Boolean.parseBoolean(getOption("implicitFeedback"));
numThreadsPerSolver = Integer.parseInt(getOption("numThreadsPerSolver"));
+ usesLongIDs = Boolean.parseBoolean(getOption("usesLongIDs", String.valueOf(false)));
/*
* compute the factorization A = U M'
@@ -137,12 +146,27 @@ public class ParallelALSFactorizationJob
* M (items x features) is the representation of items in the feature space
*/
+ if (usesLongIDs) {
+ Job mapUsers = prepareJob(getInputPath(), getOutputPath("userIDIndex"), TextInputFormat.class,
+ MapLongIDsMapper.class, VarIntWritable.class, VarLongWritable.class, IDMapReducer.class,
+ VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class);
+ mapUsers.getConfiguration().set(TOKEN_POS, String.valueOf(TasteHadoopUtils.USER_ID_POS));
+ mapUsers.waitForCompletion(true);
+
+ Job mapItems = prepareJob(getInputPath(), getOutputPath("itemIDIndex"), TextInputFormat.class,
+ MapLongIDsMapper.class, VarIntWritable.class, VarLongWritable.class, IDMapReducer.class,
+ VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class);
+ mapItems.getConfiguration().set(TOKEN_POS, String.valueOf(TasteHadoopUtils.ITEM_ID_POS));
+ mapItems.waitForCompletion(true);
+ }
+
/* create A' */
Job itemRatings = prepareJob(getInputPath(), pathToItemRatings(),
TextInputFormat.class, ItemRatingVectorsMapper.class, IntWritable.class,
VectorWritable.class, VectorSumReducer.class, IntWritable.class,
VectorWritable.class, SequenceFileOutputFormat.class);
itemRatings.setCombinerClass(VectorSumCombiner.class);
+ itemRatings.getConfiguration().set(USES_LONG_IDS, String.valueOf(usesLongIDs));
boolean succeeded = itemRatings.waitForCompletion(true);
if (!succeeded) {
return -1;
@@ -267,11 +291,18 @@ public class ParallelALSFactorizationJob
private final VectorWritable ratingsWritable = new VectorWritable(true);
private final Vector ratings = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
+ private boolean usesLongIDs;
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ usesLongIDs = ctx.getConfiguration().getBoolean(USES_LONG_IDS, false);
+ }
+
@Override
protected void map(LongWritable offset, Text line, Context ctx) throws IOException, InterruptedException {
String[] tokens = TasteHadoopUtils.splitPrefTokens(line.toString());
- int userID = Integer.parseInt(tokens[0]);
- int itemID = Integer.parseInt(tokens[1]);
+ int userID = TasteHadoopUtils.readID(tokens[TasteHadoopUtils.USER_ID_POS], usesLongIDs);
+ int itemID = TasteHadoopUtils.readID(tokens[TasteHadoopUtils.ITEM_ID_POS], usesLongIDs);
float rating = Float.parseFloat(tokens[2]);
ratings.setQuick(userID, rating);
@@ -354,6 +385,38 @@ public class ParallelALSFactorizationJob
}
}
+ static class MapLongIDsMapper extends Mapper<LongWritable,Text,VarIntWritable,VarLongWritable> {
+
+ private int tokenPos;
+ private VarIntWritable index = new VarIntWritable();
+ private VarLongWritable idWritable = new VarLongWritable();
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ tokenPos = ctx.getConfiguration().getInt(TOKEN_POS, -1);
+ Preconditions.checkState(tokenPos >= 0);
+ }
+
+ @Override
+ protected void map(LongWritable key, Text line, Context ctx) throws IOException, InterruptedException {
+ String[] tokens = TasteHadoopUtils.splitPrefTokens(line.toString());
+
+ long id = Long.parseLong(tokens[tokenPos]);
+
+ index.set(TasteHadoopUtils.idToIndex(id));
+ idWritable.set(id);
+ ctx.write(index, idWritable);
+ }
+ }
+
+ static class IDMapReducer extends Reducer<VarIntWritable,VarLongWritable,VarIntWritable,VarLongWritable> {
+ @Override
+ protected void reduce(VarIntWritable index, Iterable<VarLongWritable> ids, Context ctx)
+ throws IOException, InterruptedException {
+ ctx.write(index, ids.iterator().next());
+ }
+ }
+
private Path pathToM(int iteration) {
return iteration == numIterations - 1 ? getOutputPath("M") : getTempPath("M-" + iteration);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java Sat Jun 8 07:28:24 2013
@@ -17,16 +17,20 @@
package org.apache.mahout.cf.taste.hadoop.als;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.mahout.cf.taste.hadoop.MutableRecommendedItem;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
import org.apache.mahout.cf.taste.hadoop.TopItemsQueue;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.common.Pair;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.function.IntObjectProcedure;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
import org.apache.mahout.math.map.OpenIntObjectHashMap;
import org.apache.mahout.math.set.OpenIntHashSet;
@@ -37,34 +41,47 @@ import java.util.List;
* a multithreaded mapper that loads the feature matrices U and M into memory. Afterwards it computes recommendations
* from these. Can be executed by a {@link MultithreadedSharingMapper}.
*/
-public class PredictionMapper extends SharingMapper<IntWritable,VectorWritable,IntWritable,RecommendedItemsWritable,
+public class PredictionMapper extends SharingMapper<IntWritable,VectorWritable,LongWritable,RecommendedItemsWritable,
Pair<OpenIntObjectHashMap<Vector>,OpenIntObjectHashMap<Vector>>> {
private int recommendationsPerUser;
private float maxRating;
+ private boolean usesLongIDs;
+ private OpenIntLongHashMap userIDIndex;
+ private OpenIntLongHashMap itemIDIndex;
+
+ private final LongWritable userIDWritable = new LongWritable();
private final RecommendedItemsWritable recommendations = new RecommendedItemsWritable();
@Override
Pair<OpenIntObjectHashMap<Vector>, OpenIntObjectHashMap<Vector>> createSharedInstance(Context ctx) {
- Path pathToU = new Path(ctx.getConfiguration().get(RecommenderJob.USER_FEATURES_PATH));
- Path pathToM = new Path(ctx.getConfiguration().get(RecommenderJob.ITEM_FEATURES_PATH));
+ Configuration conf = ctx.getConfiguration();
+ Path pathToU = new Path(conf.get(RecommenderJob.USER_FEATURES_PATH));
+ Path pathToM = new Path(conf.get(RecommenderJob.ITEM_FEATURES_PATH));
- OpenIntObjectHashMap<Vector> U = ALS.readMatrixByRows(pathToU, ctx.getConfiguration());
- OpenIntObjectHashMap<Vector> M = ALS.readMatrixByRows(pathToM, ctx.getConfiguration());
+ OpenIntObjectHashMap<Vector> U = ALS.readMatrixByRows(pathToU, conf);
+ OpenIntObjectHashMap<Vector> M = ALS.readMatrixByRows(pathToM, conf);
return new Pair<OpenIntObjectHashMap<Vector>, OpenIntObjectHashMap<Vector>>(U, M);
}
@Override
protected void setup(Context ctx) throws IOException, InterruptedException {
- recommendationsPerUser = ctx.getConfiguration().getInt(RecommenderJob.NUM_RECOMMENDATIONS,
- RecommenderJob.DEFAULT_NUM_RECOMMENDATIONS);
- maxRating = Float.parseFloat(ctx.getConfiguration().get(RecommenderJob.MAX_RATING));
+ Configuration conf = ctx.getConfiguration();
+ recommendationsPerUser = conf.getInt(RecommenderJob.NUM_RECOMMENDATIONS,
+ RecommenderJob.DEFAULT_NUM_RECOMMENDATIONS);
+ maxRating = Float.parseFloat(conf.get(RecommenderJob.MAX_RATING));
+
+ usesLongIDs = conf.getBoolean(ParallelALSFactorizationJob.USES_LONG_IDS, false);
+ if (usesLongIDs) {
+ userIDIndex = TasteHadoopUtils.readIDIndexMap(conf.get(RecommenderJob.USER_INDEX_PATH), conf);
+ itemIDIndex = TasteHadoopUtils.readIDIndexMap(conf.get(RecommenderJob.ITEM_INDEX_PATH), conf);
+ }
}
@Override
- protected void map(IntWritable userIDWritable, VectorWritable ratingsWritable, Context ctx)
+ protected void map(IntWritable userIndexWritable, VectorWritable ratingsWritable, Context ctx)
throws IOException, InterruptedException {
Pair<OpenIntObjectHashMap<Vector>, OpenIntObjectHashMap<Vector>> uAndM = getSharedInstance();
@@ -72,7 +89,7 @@ public class PredictionMapper extends Sh
OpenIntObjectHashMap<Vector> M = uAndM.getSecond();
Vector ratings = ratingsWritable.get();
- final int userID = userIDWritable.get();
+ final int userIndex = userIndexWritable.get();
final OpenIntHashSet alreadyRatedItems = new OpenIntHashSet(ratings.getNumNondefaultElements());
for (Vector.Element e : ratings.nonZeroes()) {
@@ -80,7 +97,7 @@ public class PredictionMapper extends Sh
}
final TopItemsQueue topItemsQueue = new TopItemsQueue(recommendationsPerUser);
- final Vector userFeatures = U.get(userID);
+ final Vector userFeatures = U.get(userIndex);
M.forEachPair(new IntObjectProcedure<Vector>() {
@Override
@@ -107,6 +124,20 @@ public class PredictionMapper extends Sh
((MutableRecommendedItem) topItem).capToMaxValue(maxRating);
}
+ if (usesLongIDs) {
+ long userID = userIDIndex.get(userIndex);
+ userIDWritable.set(userID);
+
+ for (RecommendedItem topItem : recommendedItems) {
+ // remap item IDs
+ long itemID = itemIDIndex.get((int) topItem.getItemID());
+ ((MutableRecommendedItem) topItem).setItemID(itemID);
+ }
+
+ } else {
+ userIDWritable.set(userIndex);
+ }
+
recommendations.set(recommendedItems);
ctx.write(userIDWritable, recommendations);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java Sat Jun 8 07:28:24 2013
@@ -49,6 +49,8 @@ public class RecommenderJob extends Abst
static final String USER_FEATURES_PATH = RecommenderJob.class.getName() + ".userFeatures";
static final String ITEM_FEATURES_PATH = RecommenderJob.class.getName() + ".itemFeatures";
static final String MAX_RATING = RecommenderJob.class.getName() + ".maxRating";
+ static final String USER_INDEX_PATH = RecommenderJob.class.getName() + ".userIndex";
+ static final String ITEM_INDEX_PATH = RecommenderJob.class.getName() + ".itemIndex";
static final int DEFAULT_NUM_RECOMMENDATIONS = 10;
@@ -66,6 +68,9 @@ public class RecommenderJob extends Abst
String.valueOf(DEFAULT_NUM_RECOMMENDATIONS));
addOption("maxRating", null, "maximum rating available", true);
addOption("numThreads", null, "threads per mapper", String.valueOf(1));
+ addOption("usesLongIDs", null, "input contains long IDs that need to be translated");
+ addOption("userIDIndex", null, "index for user long IDs (necessary if usesLongIDs is true)");
+ addOption("itemIDIndex", null, "index for user long IDs (necessary if usesLongIDs is true)");
addOutputOption();
Map<String,List<String>> parsedArgs = parseArguments(args);
@@ -84,6 +89,13 @@ public class RecommenderJob extends Abst
conf.set(ITEM_FEATURES_PATH, getOption("itemFeatures"));
conf.set(MAX_RATING, getOption("maxRating"));
+ boolean usesLongIDs = Boolean.parseBoolean(getOption("usesLongIDs"));
+ if (usesLongIDs) {
+ conf.set(ParallelALSFactorizationJob.USES_LONG_IDS, String.valueOf(true));
+ conf.set(USER_INDEX_PATH, getOption("userIDIndex"));
+ conf.set(ITEM_INDEX_PATH, getOption("itemIDIndex"));
+ }
+
MultithreadedMapper.setMapperClass(prediction, PredictionMapper.class);
MultithreadedMapper.setNumberOfThreads(prediction, numThreads);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Sat Jun 8 07:28:24 2013
@@ -76,7 +76,7 @@ public final class AggregateAndRecommend
Configuration conf = context.getConfiguration();
recommendationsPerUser = conf.getInt(NUM_RECOMMENDATIONS, DEFAULT_NUM_RECOMMENDATIONS);
booleanData = conf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
- indexItemIDMap = TasteHadoopUtils.readItemIDIndexMap(conf.get(ITEMID_INDEX_PATH), conf);
+ indexItemIDMap = TasteHadoopUtils.readIDIndexMap(conf.get(ITEMID_INDEX_PATH), conf);
String itemFilePathString = conf.get(ITEMS_FILE);
if (itemFilePathString != null) {
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java Sat Jun 8 07:28:24 2013
@@ -184,7 +184,7 @@ public final class ItemSimilarityJob ext
protected void setup(Context ctx) {
Configuration conf = ctx.getConfiguration();
maxSimilarItemsPerItem = conf.getInt(MAX_SIMILARITIES_PER_ITEM, -1);
- indexItemIDMap = TasteHadoopUtils.readItemIDIndexMap(conf.get(ITEM_ID_INDEX_PATH_STR), conf);
+ indexItemIDMap = TasteHadoopUtils.readIDIndexMap(conf.get(ITEM_ID_INDEX_PATH_STR), conf);
Preconditions.checkArgument(maxSimilarItemsPerItem > 0, "maxSimilarItemsPerItem was not correctly set!");
}
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java Sat Jun 8 07:28:24 2013
@@ -19,6 +19,7 @@ package org.apache.mahout.cf.taste.hadoo
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
import org.apache.mahout.cf.taste.impl.TasteTestCase;
import org.apache.mahout.cf.taste.impl.common.FullRunningAverage;
import org.apache.mahout.cf.taste.impl.common.RunningAverage;
@@ -29,6 +30,8 @@ import org.apache.mahout.math.SparseRowM
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
import org.apache.mahout.math.hadoop.MathHelper;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
+import org.apache.mahout.math.map.OpenIntObjectHashMap;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -41,6 +44,7 @@ public class ParallelALSFactorizationJob
private static final Logger log = LoggerFactory.getLogger(ParallelALSFactorizationJobTest.class);
private File inputFile;
+ private File intermediateDir;
private File outputDir;
private File tmpDir;
private Configuration conf;
@@ -50,6 +54,8 @@ public class ParallelALSFactorizationJob
public void setUp() throws Exception {
super.setUp();
inputFile = getTestTempFile("prefs.txt");
+ intermediateDir = getTestTempDir("intermediate");
+ intermediateDir.delete();
outputDir = getTestTempDir("output");
outputDir.delete();
tmpDir = getTestTempDir("tmp");
@@ -225,6 +231,74 @@ public class ParallelALSFactorizationJob
assertTrue(rmse < 0.4);
}
+ @Test
+ public void exampleWithIDMapping() throws Exception {
+
+ String[] preferencesWithLongIDs = new String[] {
+ "5568227754922264005,-4758971626494767444,5.0",
+ "5568227754922264005,3688396615879561990,5.0",
+ "5568227754922264005,4594226737871995304,2.0",
+ "550945997885173934,-4758971626494767444,2.0",
+ "550945997885173934,4594226737871995304,3.0",
+ "550945997885173934,706816485922781596,5.0",
+ "2448095297482319463,3688396615879561990,5.0",
+ "2448095297482319463,706816485922781596,3.0",
+ "6839920411763636962,-4758971626494767444,3.0",
+ "6839920411763636962,706816485922781596,5.0" };
+
+ writeLines(inputFile, preferencesWithLongIDs);
+
+ ParallelALSFactorizationJob alsFactorization = new ParallelALSFactorizationJob();
+ alsFactorization.setConf(conf);
+
+ int numFeatures = 3;
+ int numIterations = 5;
+ double lambda = 0.065;
+
+ alsFactorization.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(),
+ "--tempDir", tmpDir.getAbsolutePath(), "--lambda", String.valueOf(lambda),
+ "--numFeatures", String.valueOf(numFeatures), "--numIterations", String.valueOf(numIterations),
+ "--numThreadsPerSolver", String.valueOf(1), "--usesLongIDs", String.valueOf(true) });
+
+
+ OpenIntLongHashMap userIDIndex =
+ TasteHadoopUtils.readIDIndexMap(outputDir.getAbsolutePath() + "/userIDIndex/part-r-00000", conf);
+ assertEquals(4, userIDIndex.size());
+
+ OpenIntLongHashMap itemIDIndex =
+ TasteHadoopUtils.readIDIndexMap(outputDir.getAbsolutePath() + "/itemIDIndex/part-r-00000", conf);
+ assertEquals(4, itemIDIndex.size());
+
+ OpenIntObjectHashMap<Vector> u =
+ MathHelper.readMatrixRows(conf, new Path(outputDir.getAbsolutePath(), "U/part-m-00000"));
+ OpenIntObjectHashMap<Vector> m =
+ MathHelper.readMatrixRows(conf, new Path(outputDir.getAbsolutePath(), "M/part-m-00000"));
+
+ assertEquals(4, u.size());
+ assertEquals(4, m.size());
+
+ RunningAverage avg = new FullRunningAverage();
+ for (String line : preferencesWithLongIDs) {
+ String[] tokens = TasteHadoopUtils.splitPrefTokens(line);
+ long userID = Long.parseLong(tokens[TasteHadoopUtils.USER_ID_POS]);
+ long itemID = Long.parseLong(tokens[TasteHadoopUtils.ITEM_ID_POS]);
+ double rating = Double.parseDouble(tokens[2]);
+
+ Vector userFeatures = u.get(TasteHadoopUtils.idToIndex(userID));
+ Vector itemFeatures = m.get(TasteHadoopUtils.idToIndex(itemID));
+
+ double estimate = userFeatures.dot(itemFeatures);
+
+ double err = rating - estimate;
+ avg.addDatum(err * err);
+ }
+
+ double rmse = Math.sqrt(avg.getAverage());
+ log.info("RMSE: {}", rmse);
+
+ assertTrue(rmse < 0.2);
+ }
+
protected static String preferencesAsText(Matrix preferences) {
StringBuilder prefsAsText = new StringBuilder();
String separator = "";
@@ -237,8 +311,64 @@ public class ParallelALSFactorizationJob
}
}
}
+ System.out.println(prefsAsText.toString());
return prefsAsText.toString();
}
+ @Test
+ public void recommenderJobWithIDMapping() throws Exception {
+
+ String[] preferencesWithLongIDs = new String[] {
+ "5568227754922264005,-4758971626494767444,5.0",
+ "5568227754922264005,3688396615879561990,5.0",
+ "5568227754922264005,4594226737871995304,2.0",
+ "550945997885173934,-4758971626494767444,2.0",
+ "550945997885173934,4594226737871995304,3.0",
+ "550945997885173934,706816485922781596,5.0",
+ "2448095297482319463,3688396615879561990,5.0",
+ "2448095297482319463,706816485922781596,3.0",
+ "6839920411763636962,-4758971626494767444,3.0",
+ "6839920411763636962,706816485922781596,5.0" };
+
+ writeLines(inputFile, preferencesWithLongIDs);
+
+ ParallelALSFactorizationJob alsFactorization = new ParallelALSFactorizationJob();
+ alsFactorization.setConf(conf);
+
+ int numFeatures = 3;
+ int numIterations = 5;
+ double lambda = 0.065;
+
+ int success = alsFactorization.run(new String[] {
+ "--input", inputFile.getAbsolutePath(),
+ "--output", intermediateDir.getAbsolutePath(),
+ "--tempDir", tmpDir.getAbsolutePath(),
+ "--lambda", String.valueOf(lambda),
+ "--numFeatures", String.valueOf(numFeatures),
+ "--numIterations", String.valueOf(numIterations),
+ "--numThreadsPerSolver", String.valueOf(1),
+ "--usesLongIDs", String.valueOf(true) });
+
+ assertEquals(success, 0);
+
+ // reset as we run in the same JVM
+ SharingMapper.reset();
+
+ RecommenderJob recommender = new RecommenderJob();
+
+ success = recommender.run(new String[] {
+ "--input", intermediateDir.getAbsolutePath() + "/userRatings/",
+ "--userFeatures", intermediateDir.getAbsolutePath() + "/U/",
+ "--itemFeatures", intermediateDir.getAbsolutePath() + "/M/",
+ "--numRecommendations", String.valueOf(2),
+ "--maxRating", String.valueOf(5.0),
+ "--numThreads", String.valueOf(2),
+ "--usesLongIDs", String.valueOf(true),
+ "--userIDIndex", intermediateDir.getAbsolutePath() + "/userIDIndex/",
+ "--itemIDIndex", intermediateDir.getAbsolutePath() + "/itemIDIndex/",
+ "--output", outputDir.getAbsolutePath() });
+
+ assertEquals(0, success);
+ }
}
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java Sat Jun 8 07:28:24 2013
@@ -39,6 +39,7 @@ import org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
+import org.apache.mahout.math.map.OpenIntObjectHashMap;
import org.easymock.IArgumentMatcher;
import org.easymock.EasyMock;
import org.junit.Assert;
@@ -51,43 +52,6 @@ public final class MathHelper {
private MathHelper() {}
/**
- * applies an {@link IArgumentMatcher} to {@link MatrixEntryWritable}s
- */
- public static MatrixEntryWritable matrixEntryMatches(final int row, final int col, final double value) {
- EasyMock.reportMatcher(new IArgumentMatcher() {
- @Override
- public boolean matches(Object argument) {
- if (argument instanceof MatrixEntryWritable) {
- MatrixEntryWritable entry = (MatrixEntryWritable) argument;
- return row == entry.getRow()
- && col == entry.getCol()
- && Math.abs(value - entry.getVal()) <= MahoutTestCase.EPSILON;
- }
- return false;
- }
-
- @Override
- public void appendTo(StringBuffer buffer) {
- buffer.append("MatrixEntry[row=").append(row)
- .append(",col=").append(col)
- .append(",value=").append(value).append(']');
- }
- });
- return null;
- }
-
- /**
- * convenience method to create a {@link MatrixEntryWritable}
- */
- public static MatrixEntryWritable matrixEntry(int row, int col, double value) {
- MatrixEntryWritable entry = new MatrixEntryWritable();
- entry.setRow(row);
- entry.setCol(col);
- entry.setVal(value);
- return entry;
- }
-
- /**
* convenience method to create a {@link Vector.Element}
*/
public static Vector.Element elem(int index, double value) {
@@ -190,6 +154,26 @@ public final class MathHelper {
}
/**
+ * read a {@link Matrix} from a SequenceFile<IntWritable,VectorWritable>
+ */
+ public static OpenIntObjectHashMap<Vector> readMatrixRows(Configuration conf, Path path) {
+ boolean readOneRow = false;
+ OpenIntObjectHashMap<Vector> rows = new OpenIntObjectHashMap<Vector>();
+ for (Pair<IntWritable,VectorWritable> record :
+ new SequenceFileIterable<IntWritable,VectorWritable>(path, true, conf)) {
+ IntWritable key = record.getFirst();
+ VectorWritable value = record.getSecond();
+ readOneRow = true;
+ int row = key.get();
+ rows.put(key.get(), record.getSecond().get());
+ }
+ if (!readOneRow) {
+ throw new IllegalStateException("Not a single row read!");
+ }
+ return rows;
+ }
+
+ /**
* write a two-dimensional double array to an SequenceFile<IntWritable,VectorWritable>
*/
public static void writeDistributedRowMatrix(double[][] entries, FileSystem fs, Configuration conf, Path path)