You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/03/21 16:44:17 UTC

svn commit: r1459365 - in /mahout/trunk: core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ examples/bin/

Author: ssc
Date: Thu Mar 21 15:44:17 2013
New Revision: 1459365

URL: http://svn.apache.org/r1459365
Log:
MAHOUT-1169 Multithreaded recommendation computation from a factorization

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingPredictionMapper.java
Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SolveExplicitFeedbackMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SolveImplicitFeedbackMapper.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingMapper.java
    mahout/trunk/examples/bin/factorize-movielens-1M.sh

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java?rev=1459365&r1=1459364&r2=1459365&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java Thu Mar 21 15:44:17 2013
@@ -24,8 +24,8 @@ import org.apache.hadoop.util.Reflection
 import java.io.IOException;
 
 /**
- * Multithreaded Mapper for {@link SharingMapper}s. Will call before() and after() once in the controlling thread
- * before and after executing the mappers using a thread pool.
+ * Multithreaded Mapper for {@link SharingMapper}s. Will call setupSharedInstance() once in the controlling thread
+ * before executing the mappers using a thread pool.
  *
  * @param <K1>
  * @param <V1>

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java?rev=1459365&r1=1459364&r2=1459365&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java Thu Mar 21 15:44:17 2013
@@ -73,7 +73,7 @@ import java.util.Random;
  * <li>--lambda (double): regularization parameter to avoid overfitting</li>
  * <li>--userFeatures (path): path to the user feature matrix</li>
  * <li>--itemFeatures (path): path to the item feature matrix</li>
- * <li>--numThreadsPerSolver (int): threads to use per solver mapper, default: 1 (no multithreading)</li>
+ * <li>--numThreadsPerSolver (int): threads to use per solver mapper, (default: 1)</li>
  * </ol>
  */
 public class ParallelALSFactorizationJob extends AbstractJob {
@@ -186,6 +186,9 @@ public class ParallelALSFactorizationJob
       writer = new SequenceFile.Writer(fs, getConf(), new Path(pathToM(-1), "part-m-00000"), IntWritable.class,
           VectorWritable.class);
 
+      IntWritable index = new IntWritable();
+      VectorWritable featureVector = new VectorWritable();
+
       Iterator<Vector.Element> averages = averageRatings.iterateNonZero();
       while (averages.hasNext()) {
         Vector.Element e = averages.next();
@@ -194,7 +197,9 @@ public class ParallelALSFactorizationJob
         for (int m = 1; m < numFeatures; m++) {
           row.setQuick(m, random.nextDouble());
         }
-        writer.append(new IntWritable(e.index()), new VectorWritable(row));
+        index.set(e.index());
+        featureVector.set(row);
+        writer.append(index, featureVector);
       }
     } finally {
       Closeables.closeQuietly(writer);
@@ -230,48 +235,29 @@ public class ParallelALSFactorizationJob
       throws ClassNotFoundException, IOException, InterruptedException {
 
     int iterationNumber = currentIteration + 1;
-    Class<? extends Mapper> solverMapperClass;
     Class<? extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable>> solverMapperClassInternal = null;
     String name = null;
-    // no multithreading
-    if (numThreadsPerSolver == 1) {
-
-      if (implicitFeedback) {
-        solverMapperClass = SolveImplicitFeedbackMapper.class;
-        name = "Recompute " + matrixName +", iteration (" + iterationNumber + "/" + numIterations + "), " +
-            "(single-threaded, implicit feedback)";
-      } else {
-        solverMapperClass = SolveExplicitFeedbackMapper.class;
-        name = "Recompute " + matrixName +", iteration (" + iterationNumber + "/" + numIterations + "), " +
-            "(single-threaded, explicit feedback)";
-      }
 
+    if (implicitFeedback) {
+      solverMapperClassInternal = SharingSolveImplicitFeedbackMapper.class;
+      name = "Recompute " + matrixName +", iteration (" + iterationNumber + "/" + numIterations + "), " +
+          "(" + numThreadsPerSolver + " threads, implicit feedback)";
     } else {
-      solverMapperClass = MultithreadedSharingMapper.class;
-
-      if (implicitFeedback) {
-        solverMapperClassInternal = SharingSolveImplicitFeedbackMapper.class;
-        name = "Recompute " + matrixName +", iteration (" + iterationNumber + "/" + numIterations + "), " +
-            "(" + numThreadsPerSolver + " threads, implicit feedback)";
-      } else {
-        solverMapperClassInternal = SharingSolveExplicitFeedbackMapper.class;
-        name = "Recompute " + matrixName +", iteration (" + iterationNumber + "/" + numIterations + "), " +
-            "(" + numThreadsPerSolver + " threads, explicit feedback)";
-      }
+      solverMapperClassInternal = SharingSolveExplicitFeedbackMapper.class;
+      name = "Recompute " + matrixName +", iteration (" + iterationNumber + "/" + numIterations + "), " +
+          "(" + numThreadsPerSolver + " threads, explicit feedback)";
     }
 
-    Job solverForUorI = prepareJob(ratings, output, SequenceFileInputFormat.class, solverMapperClass, IntWritable.class,
-        VectorWritable.class, SequenceFileOutputFormat.class, name);
+    Job solverForUorI = prepareJob(ratings, output, SequenceFileInputFormat.class, MultithreadedSharingMapper.class,
+        IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class, name);
     Configuration solverConf = solverForUorI.getConfiguration();
     solverConf.set(LAMBDA, String.valueOf(lambda));
     solverConf.set(ALPHA, String.valueOf(alpha));
     solverConf.setInt(NUM_FEATURES, numFeatures);
     solverConf.set(FEATURE_MATRIX, pathToUorI.toString());
 
-    if (numThreadsPerSolver > 1) {
-      MultithreadedMapper.setMapperClass(solverForUorI, solverMapperClassInternal);
-      MultithreadedMapper.setNumberOfThreads(solverForUorI, numThreadsPerSolver);
-    }
+    MultithreadedMapper.setMapperClass(solverForUorI, solverMapperClassInternal);
+    MultithreadedMapper.setNumberOfThreads(solverForUorI, numThreadsPerSolver);
 
     boolean succeeded = solverForUorI.waitForCompletion(true);
     if (!succeeded) {

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java?rev=1459365&r1=1459364&r2=1459365&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java Thu Mar 21 15:44:17 2013
@@ -18,24 +18,15 @@
 package org.apache.mahout.cf.taste.hadoop.als;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
-import org.apache.mahout.cf.taste.recommender.RecommendedItem;
 import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.function.IntObjectProcedure;
-import org.apache.mahout.math.map.OpenIntObjectHashMap;
-import org.apache.mahout.math.set.OpenIntHashSet;
 
-import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -47,17 +38,17 @@ import java.util.Map;
  * <ol>
  * <li>--input (path): Directory containing the vectorized user ratings</li>
  * <li>--output (path): path where output should go</li>
- * <li>--numRecommendations (int): maximum number of recommendations per user</li>
+ * <li>--numRecommendations (int): maximum number of recommendations per user (default: 10)</li>
  * <li>--maxRating (double): maximum rating of an item</li>
- * <li>--NUM_FEATURES (int): number of features to use for decomposition </li>
+ * <li>--numThreads (int): threads to use per mapper, (default: 1)</li>
  * </ol>
  */
 public class RecommenderJob extends AbstractJob {
 
-  private static final String NUM_RECOMMENDATIONS = RecommenderJob.class.getName() + ".numRecommendations";
-  private static final String USER_FEATURES_PATH = RecommenderJob.class.getName() + ".userFeatures";
-  private static final String ITEM_FEATURES_PATH = RecommenderJob.class.getName() + ".itemFeatures";
-  private static final String MAX_RATING = RecommenderJob.class.getName() + ".maxRating";
+  static final String NUM_RECOMMENDATIONS = RecommenderJob.class.getName() + ".numRecommendations";
+  static final String USER_FEATURES_PATH = RecommenderJob.class.getName() + ".userFeatures";
+  static final String ITEM_FEATURES_PATH = RecommenderJob.class.getName() + ".itemFeatures";
+  static final String MAX_RATING = RecommenderJob.class.getName() + ".maxRating";
 
   static final int DEFAULT_NUM_RECOMMENDATIONS = 10;
 
@@ -74,6 +65,7 @@ public class RecommenderJob extends Abst
     addOption("numRecommendations", null, "number of recommendations per user",
         String.valueOf(DEFAULT_NUM_RECOMMENDATIONS));
     addOption("maxRating", null, "maximum rating available", true);
+    addOption("numThreads", null, "threads per mapper", String.valueOf(1));
     addOutputOption();
 
     Map<String,List<String>> parsedArgs = parseArguments(args);
@@ -81,15 +73,20 @@ public class RecommenderJob extends Abst
       return -1;
     }
 
-    Job prediction = prepareJob(getInputPath(), getOutputPath(), SequenceFileInputFormat.class, PredictionMapper.class,
-        IntWritable.class, RecommendedItemsWritable.class, TextOutputFormat.class);
+    Job prediction = prepareJob(getInputPath(), getOutputPath(), SequenceFileInputFormat.class,
+        MultithreadedSharingMapper.class, IntWritable.class, RecommendedItemsWritable.class, TextOutputFormat.class);
     Configuration conf = prediction.getConfiguration();
 
+    int numThreads = Integer.parseInt(getOption("numThreads"));
+
     conf.setInt(NUM_RECOMMENDATIONS, Integer.parseInt(getOption("numRecommendations")));
     conf.set(USER_FEATURES_PATH, getOption("userFeatures"));
     conf.set(ITEM_FEATURES_PATH, getOption("itemFeatures"));
     conf.set(MAX_RATING, getOption("maxRating"));
 
+    MultithreadedMapper.setMapperClass(prediction, SharingPredictionMapper.class);
+    MultithreadedMapper.setNumberOfThreads(prediction, numThreads);
+
     boolean succeeded = prediction.waitForCompletion(true);
     if (!succeeded) {
       return -1;
@@ -98,85 +95,4 @@ public class RecommenderJob extends Abst
     return 0;
   }
 
-  static class PredictionMapper
-      extends Mapper<IntWritable,VectorWritable,IntWritable,RecommendedItemsWritable> {
-
-    private OpenIntObjectHashMap<Vector> U;
-    private OpenIntObjectHashMap<Vector> M;
-
-    private int recommendationsPerUser;
-    private float maxRating;
-
-    private final RecommendedItemsWritable recommendations = new RecommendedItemsWritable();
-
-    @Override
-    protected void setup(Context ctx) throws IOException, InterruptedException {
-      recommendationsPerUser = ctx.getConfiguration().getInt(NUM_RECOMMENDATIONS, DEFAULT_NUM_RECOMMENDATIONS);
-
-      Path pathToU = new Path(ctx.getConfiguration().get(USER_FEATURES_PATH));
-      Path pathToM = new Path(ctx.getConfiguration().get(ITEM_FEATURES_PATH));
-
-      U = ALS.readMatrixByRows(pathToU, ctx.getConfiguration());
-      M = ALS.readMatrixByRows(pathToM, ctx.getConfiguration());
-
-      maxRating = Float.parseFloat(ctx.getConfiguration().get(MAX_RATING));
-    }
-
-    // we can use a simple dot product computation, as both vectors are dense
-    private static double dot(Vector x, Vector y) {
-      int numFeatures = x.size();
-      double sum = 0;
-      for (int n = 0; n < numFeatures; n++) {
-        sum += x.getQuick(n) * y.getQuick(n);
-      }
-      return sum;
-    }
-
-    @Override
-    protected void map(IntWritable userIDWritable, VectorWritable ratingsWritable, Context ctx)
-        throws IOException, InterruptedException {
-
-      Vector ratings = ratingsWritable.get();
-      final int userID = userIDWritable.get();
-      final OpenIntHashSet alreadyRatedItems = new OpenIntHashSet(ratings.getNumNondefaultElements());
-
-      Iterator<Vector.Element> ratingsIterator = ratings.iterateNonZero();
-      while (ratingsIterator.hasNext()) {
-        alreadyRatedItems.add(ratingsIterator.next().index());
-      }
-
-      final TopItemQueue topItemQueue = new TopItemQueue(recommendationsPerUser);
-      final Vector userFeatures = U.get(userID);
-
-      M.forEachPair(new IntObjectProcedure<Vector>() {
-        @Override
-        public boolean apply(int itemID, Vector itemFeatures) {
-          if (!alreadyRatedItems.contains(itemID)) {
-            double predictedRating = dot(userFeatures, itemFeatures);
-
-            MutableRecommendedItem top = topItemQueue.top();
-            if (predictedRating > top.getValue()) {
-              top.set(itemID, (float) predictedRating);
-              topItemQueue.updateTop();
-            }
-
-          }
-          return true;
-        }
-      });
-
-      List<RecommendedItem> recommendedItems = topItemQueue.getTopItems();
-
-      if (!recommendedItems.isEmpty()) {
-
-        // cap predictions to maxRating
-        for (RecommendedItem topItem : recommendedItems) {
-          ((MutableRecommendedItem) topItem).capToMaxValue(maxRating);
-        }
-
-        recommendations.set(recommendedItems);
-        ctx.write(userIDWritable, recommendations);
-      }
-    }
-  }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingMapper.java?rev=1459365&r1=1459364&r2=1459365&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingMapper.java Thu Mar 21 15:44:17 2013
@@ -41,7 +41,7 @@ public abstract class SharingMapper<K1,V
    */
   abstract S createSharedInstance(Context context);
 
-  void setupSharedInstance(Context context) {
+  final void setupSharedInstance(Context context) {
     SHARED_INSTANCE = createSharedInstance(context);
   }
 

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingPredictionMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingPredictionMapper.java?rev=1459365&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingPredictionMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SharingPredictionMapper.java Thu Mar 21 15:44:17 2013
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.als;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.IntObjectProcedure;
+import org.apache.mahout.math.map.OpenIntObjectHashMap;
+import org.apache.mahout.math.set.OpenIntHashSet;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * a multithreaded mapper that loads the feature matrices U and M into memory. Afterwards it computes recommendations
+ * from these. Can be executed by a {@link MultithreadedSharingMapper}.
+ */
+class SharingPredictionMapper extends SharingMapper<IntWritable,VectorWritable,IntWritable,RecommendedItemsWritable,
+    Pair<OpenIntObjectHashMap<Vector>,OpenIntObjectHashMap<Vector>>> {
+
+  private int recommendationsPerUser;
+  private float maxRating;
+
+  private final RecommendedItemsWritable recommendations = new RecommendedItemsWritable();
+
+  @Override
+  Pair<OpenIntObjectHashMap<Vector>, OpenIntObjectHashMap<Vector>> createSharedInstance(Context ctx) {
+    Path pathToU = new Path(ctx.getConfiguration().get(RecommenderJob.USER_FEATURES_PATH));
+    Path pathToM = new Path(ctx.getConfiguration().get(RecommenderJob.ITEM_FEATURES_PATH));
+
+    OpenIntObjectHashMap<Vector> U = ALS.readMatrixByRows(pathToU, ctx.getConfiguration());
+    OpenIntObjectHashMap<Vector> M = ALS.readMatrixByRows(pathToM, ctx.getConfiguration());
+
+    return new Pair<OpenIntObjectHashMap<Vector>, OpenIntObjectHashMap<Vector>>(U, M);
+  }
+
+  @Override
+  protected void setup(Context ctx) throws IOException, InterruptedException {
+    recommendationsPerUser = ctx.getConfiguration().getInt(RecommenderJob.NUM_RECOMMENDATIONS,
+      RecommenderJob.DEFAULT_NUM_RECOMMENDATIONS);
+    maxRating = Float.parseFloat(ctx.getConfiguration().get(RecommenderJob.MAX_RATING));
+  }
+
+  @Override
+  protected void map(IntWritable userIDWritable, VectorWritable ratingsWritable, Context ctx)
+      throws IOException, InterruptedException {
+
+    Pair<OpenIntObjectHashMap<Vector>, OpenIntObjectHashMap<Vector>> uAndM = getSharedInstance();
+    OpenIntObjectHashMap<Vector> U = uAndM.getFirst();
+    OpenIntObjectHashMap<Vector> M = uAndM.getSecond();
+
+    Vector ratings = ratingsWritable.get();
+    final int userID = userIDWritable.get();
+    final OpenIntHashSet alreadyRatedItems = new OpenIntHashSet(ratings.getNumNondefaultElements());
+
+    Iterator<Vector.Element> ratingsIterator = ratings.iterateNonZero();
+    while (ratingsIterator.hasNext()) {
+      alreadyRatedItems.add(ratingsIterator.next().index());
+    }
+
+    final TopItemQueue topItemQueue = new TopItemQueue(recommendationsPerUser);
+    final Vector userFeatures = U.get(userID);
+
+    M.forEachPair(new IntObjectProcedure<Vector>() {
+      @Override
+      public boolean apply(int itemID, Vector itemFeatures) {
+        if (!alreadyRatedItems.contains(itemID)) {
+          double predictedRating = userFeatures.dot(itemFeatures);
+
+          MutableRecommendedItem top = topItemQueue.top();
+          if (predictedRating > top.getValue()) {
+            top.set(itemID, (float) predictedRating);
+            topItemQueue.updateTop();
+          }
+        }
+        return true;
+      }
+    });
+
+    List<RecommendedItem> recommendedItems = topItemQueue.getTopItems();
+
+    if (!recommendedItems.isEmpty()) {
+
+      // cap predictions to maxRating
+      for (RecommendedItem topItem : recommendedItems) {
+        ((MutableRecommendedItem) topItem).capToMaxValue(maxRating);
+      }
+
+      recommendations.set(recommendedItems);
+      ctx.write(userIDWritable, recommendations);
+    }
+  }
+}

Modified: mahout/trunk/examples/bin/factorize-movielens-1M.sh
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/bin/factorize-movielens-1M.sh?rev=1459365&r1=1459364&r2=1459365&view=diff
==============================================================================
--- mahout/trunk/examples/bin/factorize-movielens-1M.sh (original)
+++ mahout/trunk/examples/bin/factorize-movielens-1M.sh Thu Mar 21 15:44:17 2013
@@ -62,7 +62,7 @@ $MAHOUT evaluateFactorization --input ${
 # compute recommendations
 $MAHOUT recommendfactorized --input ${WORK_DIR}/als/out/userRatings/ --output ${WORK_DIR}/recommendations/ \
     --userFeatures ${WORK_DIR}/als/out/U/ --itemFeatures ${WORK_DIR}/als/out/M/ \
-    --numRecommendations 6 --maxRating 5
+    --numRecommendations 6 --maxRating 5 --numThreads 2
 
 # print the error
 echo -e "\nRMSE is:\n"