You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/03/21 16:44:17 UTC
svn commit: r1459365 - in /mahout/trunk:
core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ examples/bin/
Author: ssc
Date: Thu Mar 21 15:44:17 2013
New Revision: 1459365
URL: http://svn.apache.org/r1459365
Log:
MAHOUT-1169 Multithreaded recommendation computation from a factorization
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingPredictionMapper.java
Removed:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SolveExplicitFeedbackMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SolveImplicitFeedbackMapper.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingMapper.java
mahout/trunk/examples/bin/factorize-movielens-1M.sh
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java?rev=1459365&r1=1459364&r2=1459365&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java Thu Mar 21 15:44:17 2013
@@ -24,8 +24,8 @@ import org.apache.hadoop.util.Reflection
import java.io.IOException;
/**
- * Multithreaded Mapper for {@link SharingMapper}s. Will call before() and after() once in the controlling thread
- * before and after executing the mappers using a thread pool.
+ * Multithreaded Mapper for {@link SharingMapper}s. Will call setupSharedInstance() once in the controlling thread
+ * before executing the mappers using a thread pool.
*
* @param <K1>
* @param <V1>
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java?rev=1459365&r1=1459364&r2=1459365&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java Thu Mar 21 15:44:17 2013
@@ -73,7 +73,7 @@ import java.util.Random;
* <li>--lambda (double): regularization parameter to avoid overfitting</li>
* <li>--userFeatures (path): path to the user feature matrix</li>
* <li>--itemFeatures (path): path to the item feature matrix</li>
- * <li>--numThreadsPerSolver (int): threads to use per solver mapper, default: 1 (no multithreading)</li>
+ * <li>--numThreadsPerSolver (int): threads to use per solver mapper, (default: 1)</li>
* </ol>
*/
public class ParallelALSFactorizationJob extends AbstractJob {
@@ -186,6 +186,9 @@ public class ParallelALSFactorizationJob
writer = new SequenceFile.Writer(fs, getConf(), new Path(pathToM(-1), "part-m-00000"), IntWritable.class,
VectorWritable.class);
+ IntWritable index = new IntWritable();
+ VectorWritable featureVector = new VectorWritable();
+
Iterator<Vector.Element> averages = averageRatings.iterateNonZero();
while (averages.hasNext()) {
Vector.Element e = averages.next();
@@ -194,7 +197,9 @@ public class ParallelALSFactorizationJob
for (int m = 1; m < numFeatures; m++) {
row.setQuick(m, random.nextDouble());
}
- writer.append(new IntWritable(e.index()), new VectorWritable(row));
+ index.set(e.index());
+ featureVector.set(row);
+ writer.append(index, featureVector);
}
} finally {
Closeables.closeQuietly(writer);
@@ -230,48 +235,29 @@ public class ParallelALSFactorizationJob
throws ClassNotFoundException, IOException, InterruptedException {
int iterationNumber = currentIteration + 1;
- Class<? extends Mapper> solverMapperClass;
Class<? extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable>> solverMapperClassInternal = null;
String name = null;
- // no multithreading
- if (numThreadsPerSolver == 1) {
-
- if (implicitFeedback) {
- solverMapperClass = SolveImplicitFeedbackMapper.class;
- name = "Recompute " + matrixName +", iteration (" + iterationNumber + "/" + numIterations + "), " +
- "(single-threaded, implicit feedback)";
- } else {
- solverMapperClass = SolveExplicitFeedbackMapper.class;
- name = "Recompute " + matrixName +", iteration (" + iterationNumber + "/" + numIterations + "), " +
- "(single-threaded, explicit feedback)";
- }
+ if (implicitFeedback) {
+ solverMapperClassInternal = SharingSolveImplicitFeedbackMapper.class;
+ name = "Recompute " + matrixName +", iteration (" + iterationNumber + "/" + numIterations + "), " +
+ "(" + numThreadsPerSolver + " threads, implicit feedback)";
} else {
- solverMapperClass = MultithreadedSharingMapper.class;
-
- if (implicitFeedback) {
- solverMapperClassInternal = SharingSolveImplicitFeedbackMapper.class;
- name = "Recompute " + matrixName +", iteration (" + iterationNumber + "/" + numIterations + "), " +
- "(" + numThreadsPerSolver + " threads, implicit feedback)";
- } else {
- solverMapperClassInternal = SharingSolveExplicitFeedbackMapper.class;
- name = "Recompute " + matrixName +", iteration (" + iterationNumber + "/" + numIterations + "), " +
- "(" + numThreadsPerSolver + " threads, explicit feedback)";
- }
+ solverMapperClassInternal = SharingSolveExplicitFeedbackMapper.class;
+ name = "Recompute " + matrixName +", iteration (" + iterationNumber + "/" + numIterations + "), " +
+ "(" + numThreadsPerSolver + " threads, explicit feedback)";
}
- Job solverForUorI = prepareJob(ratings, output, SequenceFileInputFormat.class, solverMapperClass, IntWritable.class,
- VectorWritable.class, SequenceFileOutputFormat.class, name);
+ Job solverForUorI = prepareJob(ratings, output, SequenceFileInputFormat.class, MultithreadedSharingMapper.class,
+ IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class, name);
Configuration solverConf = solverForUorI.getConfiguration();
solverConf.set(LAMBDA, String.valueOf(lambda));
solverConf.set(ALPHA, String.valueOf(alpha));
solverConf.setInt(NUM_FEATURES, numFeatures);
solverConf.set(FEATURE_MATRIX, pathToUorI.toString());
- if (numThreadsPerSolver > 1) {
- MultithreadedMapper.setMapperClass(solverForUorI, solverMapperClassInternal);
- MultithreadedMapper.setNumberOfThreads(solverForUorI, numThreadsPerSolver);
- }
+ MultithreadedMapper.setMapperClass(solverForUorI, solverMapperClassInternal);
+ MultithreadedMapper.setNumberOfThreads(solverForUorI, numThreadsPerSolver);
boolean succeeded = solverForUorI.waitForCompletion(true);
if (!succeeded) {
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java?rev=1459365&r1=1459364&r2=1459365&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java Thu Mar 21 15:44:17 2013
@@ -18,24 +18,15 @@
package org.apache.mahout.cf.taste.hadoop.als;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
-import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.function.IntObjectProcedure;
-import org.apache.mahout.math.map.OpenIntObjectHashMap;
-import org.apache.mahout.math.set.OpenIntHashSet;
-import java.io.IOException;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -47,17 +38,17 @@ import java.util.Map;
* <ol>
* <li>--input (path): Directory containing the vectorized user ratings</li>
* <li>--output (path): path where output should go</li>
- * <li>--numRecommendations (int): maximum number of recommendations per user</li>
+ * <li>--numRecommendations (int): maximum number of recommendations per user (default: 10)</li>
* <li>--maxRating (double): maximum rating of an item</li>
- * <li>--NUM_FEATURES (int): number of features to use for decomposition </li>
+ * <li>--numThreads (int): threads to use per mapper, (default: 1)</li>
* </ol>
*/
public class RecommenderJob extends AbstractJob {
- private static final String NUM_RECOMMENDATIONS = RecommenderJob.class.getName() + ".numRecommendations";
- private static final String USER_FEATURES_PATH = RecommenderJob.class.getName() + ".userFeatures";
- private static final String ITEM_FEATURES_PATH = RecommenderJob.class.getName() + ".itemFeatures";
- private static final String MAX_RATING = RecommenderJob.class.getName() + ".maxRating";
+ static final String NUM_RECOMMENDATIONS = RecommenderJob.class.getName() + ".numRecommendations";
+ static final String USER_FEATURES_PATH = RecommenderJob.class.getName() + ".userFeatures";
+ static final String ITEM_FEATURES_PATH = RecommenderJob.class.getName() + ".itemFeatures";
+ static final String MAX_RATING = RecommenderJob.class.getName() + ".maxRating";
static final int DEFAULT_NUM_RECOMMENDATIONS = 10;
@@ -74,6 +65,7 @@ public class RecommenderJob extends Abst
addOption("numRecommendations", null, "number of recommendations per user",
String.valueOf(DEFAULT_NUM_RECOMMENDATIONS));
addOption("maxRating", null, "maximum rating available", true);
+ addOption("numThreads", null, "threads per mapper", String.valueOf(1));
addOutputOption();
Map<String,List<String>> parsedArgs = parseArguments(args);
@@ -81,15 +73,20 @@ public class RecommenderJob extends Abst
return -1;
}
- Job prediction = prepareJob(getInputPath(), getOutputPath(), SequenceFileInputFormat.class, PredictionMapper.class,
- IntWritable.class, RecommendedItemsWritable.class, TextOutputFormat.class);
+ Job prediction = prepareJob(getInputPath(), getOutputPath(), SequenceFileInputFormat.class,
+ MultithreadedSharingMapper.class, IntWritable.class, RecommendedItemsWritable.class, TextOutputFormat.class);
Configuration conf = prediction.getConfiguration();
+ int numThreads = Integer.parseInt(getOption("numThreads"));
+
conf.setInt(NUM_RECOMMENDATIONS, Integer.parseInt(getOption("numRecommendations")));
conf.set(USER_FEATURES_PATH, getOption("userFeatures"));
conf.set(ITEM_FEATURES_PATH, getOption("itemFeatures"));
conf.set(MAX_RATING, getOption("maxRating"));
+ MultithreadedMapper.setMapperClass(prediction, SharingPredictionMapper.class);
+ MultithreadedMapper.setNumberOfThreads(prediction, numThreads);
+
boolean succeeded = prediction.waitForCompletion(true);
if (!succeeded) {
return -1;
@@ -98,85 +95,4 @@ public class RecommenderJob extends Abst
return 0;
}
- static class PredictionMapper
- extends Mapper<IntWritable,VectorWritable,IntWritable,RecommendedItemsWritable> {
-
- private OpenIntObjectHashMap<Vector> U;
- private OpenIntObjectHashMap<Vector> M;
-
- private int recommendationsPerUser;
- private float maxRating;
-
- private final RecommendedItemsWritable recommendations = new RecommendedItemsWritable();
-
- @Override
- protected void setup(Context ctx) throws IOException, InterruptedException {
- recommendationsPerUser = ctx.getConfiguration().getInt(NUM_RECOMMENDATIONS, DEFAULT_NUM_RECOMMENDATIONS);
-
- Path pathToU = new Path(ctx.getConfiguration().get(USER_FEATURES_PATH));
- Path pathToM = new Path(ctx.getConfiguration().get(ITEM_FEATURES_PATH));
-
- U = ALS.readMatrixByRows(pathToU, ctx.getConfiguration());
- M = ALS.readMatrixByRows(pathToM, ctx.getConfiguration());
-
- maxRating = Float.parseFloat(ctx.getConfiguration().get(MAX_RATING));
- }
-
- // we can use a simple dot product computation, as both vectors are dense
- private static double dot(Vector x, Vector y) {
- int numFeatures = x.size();
- double sum = 0;
- for (int n = 0; n < numFeatures; n++) {
- sum += x.getQuick(n) * y.getQuick(n);
- }
- return sum;
- }
-
- @Override
- protected void map(IntWritable userIDWritable, VectorWritable ratingsWritable, Context ctx)
- throws IOException, InterruptedException {
-
- Vector ratings = ratingsWritable.get();
- final int userID = userIDWritable.get();
- final OpenIntHashSet alreadyRatedItems = new OpenIntHashSet(ratings.getNumNondefaultElements());
-
- Iterator<Vector.Element> ratingsIterator = ratings.iterateNonZero();
- while (ratingsIterator.hasNext()) {
- alreadyRatedItems.add(ratingsIterator.next().index());
- }
-
- final TopItemQueue topItemQueue = new TopItemQueue(recommendationsPerUser);
- final Vector userFeatures = U.get(userID);
-
- M.forEachPair(new IntObjectProcedure<Vector>() {
- @Override
- public boolean apply(int itemID, Vector itemFeatures) {
- if (!alreadyRatedItems.contains(itemID)) {
- double predictedRating = dot(userFeatures, itemFeatures);
-
- MutableRecommendedItem top = topItemQueue.top();
- if (predictedRating > top.getValue()) {
- top.set(itemID, (float) predictedRating);
- topItemQueue.updateTop();
- }
-
- }
- return true;
- }
- });
-
- List<RecommendedItem> recommendedItems = topItemQueue.getTopItems();
-
- if (!recommendedItems.isEmpty()) {
-
- // cap predictions to maxRating
- for (RecommendedItem topItem : recommendedItems) {
- ((MutableRecommendedItem) topItem).capToMaxValue(maxRating);
- }
-
- recommendations.set(recommendedItems);
- ctx.write(userIDWritable, recommendations);
- }
- }
- }
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingMapper.java?rev=1459365&r1=1459364&r2=1459365&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingMapper.java Thu Mar 21 15:44:17 2013
@@ -41,7 +41,7 @@ public abstract class SharingMapper<K1,V
*/
abstract S createSharedInstance(Context context);
- void setupSharedInstance(Context context) {
+ final void setupSharedInstance(Context context) {
SHARED_INSTANCE = createSharedInstance(context);
}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingPredictionMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingPredictionMapper.java?rev=1459365&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingPredictionMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingPredictionMapper.java Thu Mar 21 15:44:17 2013
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.als;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.IntObjectProcedure;
+import org.apache.mahout.math.map.OpenIntObjectHashMap;
+import org.apache.mahout.math.set.OpenIntHashSet;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * a multithreaded mapper that loads the feature matrices U and M into memory. Afterwards it computes recommendations
+ * from these. Can be executed by a {@link MultithreadedSharingMapper}.
+ */
+class SharingPredictionMapper extends SharingMapper<IntWritable,VectorWritable,IntWritable,RecommendedItemsWritable,
+ Pair<OpenIntObjectHashMap<Vector>,OpenIntObjectHashMap<Vector>>> {
+
+ private int recommendationsPerUser;
+ private float maxRating;
+
+ private final RecommendedItemsWritable recommendations = new RecommendedItemsWritable();
+
+ @Override
+ Pair<OpenIntObjectHashMap<Vector>, OpenIntObjectHashMap<Vector>> createSharedInstance(Context ctx) {
+ Path pathToU = new Path(ctx.getConfiguration().get(RecommenderJob.USER_FEATURES_PATH));
+ Path pathToM = new Path(ctx.getConfiguration().get(RecommenderJob.ITEM_FEATURES_PATH));
+
+ OpenIntObjectHashMap<Vector> U = ALS.readMatrixByRows(pathToU, ctx.getConfiguration());
+ OpenIntObjectHashMap<Vector> M = ALS.readMatrixByRows(pathToM, ctx.getConfiguration());
+
+ return new Pair<OpenIntObjectHashMap<Vector>, OpenIntObjectHashMap<Vector>>(U, M);
+ }
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ recommendationsPerUser = ctx.getConfiguration().getInt(RecommenderJob.NUM_RECOMMENDATIONS,
+ RecommenderJob.DEFAULT_NUM_RECOMMENDATIONS);
+ maxRating = Float.parseFloat(ctx.getConfiguration().get(RecommenderJob.MAX_RATING));
+ }
+
+ @Override
+ protected void map(IntWritable userIDWritable, VectorWritable ratingsWritable, Context ctx)
+ throws IOException, InterruptedException {
+
+ Pair<OpenIntObjectHashMap<Vector>, OpenIntObjectHashMap<Vector>> uAndM = getSharedInstance();
+ OpenIntObjectHashMap<Vector> U = uAndM.getFirst();
+ OpenIntObjectHashMap<Vector> M = uAndM.getSecond();
+
+ Vector ratings = ratingsWritable.get();
+ final int userID = userIDWritable.get();
+ final OpenIntHashSet alreadyRatedItems = new OpenIntHashSet(ratings.getNumNondefaultElements());
+
+ Iterator<Vector.Element> ratingsIterator = ratings.iterateNonZero();
+ while (ratingsIterator.hasNext()) {
+ alreadyRatedItems.add(ratingsIterator.next().index());
+ }
+
+ final TopItemQueue topItemQueue = new TopItemQueue(recommendationsPerUser);
+ final Vector userFeatures = U.get(userID);
+
+ M.forEachPair(new IntObjectProcedure<Vector>() {
+ @Override
+ public boolean apply(int itemID, Vector itemFeatures) {
+ if (!alreadyRatedItems.contains(itemID)) {
+ double predictedRating = userFeatures.dot(itemFeatures);
+
+ MutableRecommendedItem top = topItemQueue.top();
+ if (predictedRating > top.getValue()) {
+ top.set(itemID, (float) predictedRating);
+ topItemQueue.updateTop();
+ }
+ }
+ return true;
+ }
+ });
+
+ List<RecommendedItem> recommendedItems = topItemQueue.getTopItems();
+
+ if (!recommendedItems.isEmpty()) {
+
+ // cap predictions to maxRating
+ for (RecommendedItem topItem : recommendedItems) {
+ ((MutableRecommendedItem) topItem).capToMaxValue(maxRating);
+ }
+
+ recommendations.set(recommendedItems);
+ ctx.write(userIDWritable, recommendations);
+ }
+ }
+}
Modified: mahout/trunk/examples/bin/factorize-movielens-1M.sh
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/bin/factorize-movielens-1M.sh?rev=1459365&r1=1459364&r2=1459365&view=diff
==============================================================================
--- mahout/trunk/examples/bin/factorize-movielens-1M.sh (original)
+++ mahout/trunk/examples/bin/factorize-movielens-1M.sh Thu Mar 21 15:44:17 2013
@@ -62,7 +62,7 @@ $MAHOUT evaluateFactorization --input ${
# compute recommendations
$MAHOUT recommendfactorized --input ${WORK_DIR}/als/out/userRatings/ --output ${WORK_DIR}/recommendations/ \
--userFeatures ${WORK_DIR}/als/out/U/ --itemFeatures ${WORK_DIR}/als/out/M/ \
- --numRecommendations 6 --maxRating 5
+ --numRecommendations 6 --maxRating 5 --numThreads 2
# print the error
echo -e "\nRMSE is:\n"