You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/06/08 09:28:25 UTC

svn commit: r1490930 - in /mahout/trunk: ./ core/src/main/java/org/apache/mahout/cf/taste/hadoop/ core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ core/src/main/java/org/apache/mahout/...

Author: ssc
Date: Sat Jun  8 07:28:24 2013
New Revision: 1490930

URL: http://svn.apache.org/r1490930
Log:
MAHOUT-974 org.apache.mahout.cf.taste.hadoop.als.ParallelALSFactorizationJob use integer as userId and itemId

Modified:
    mahout/trunk/CHANGELOG
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java

Modified: mahout/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Sat Jun  8 07:28:24 2013
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 0.8 - unreleased
 
+  MAHOUT-974:  org.apache.mahout.cf.taste.hadoop.als.ParallelALSFactorizationJob use integer as userId and itemId (ssc)
+
   MAHOUT-1052: Add an option to MinHashDriver that specifies the dimension of vector to hash (indexes or values) (Elena Smirnova via smarthi)
 
 __MAHOUT-1237: Total cluster cost isn't computed properly (dfilimon)

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java Sat Jun  8 07:28:24 2013
@@ -45,6 +45,10 @@ public class MutableRecommendedItem impl
     return value;
   }
 
+  public void setItemID(long itemID) {
+    this.itemID = itemID;
+  }
+
   public void set(long itemID, float value) {
     this.itemID = itemID;
     this.value = value;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java Sat Jun  8 07:28:24 2013
@@ -35,11 +35,13 @@ import java.util.regex.Pattern;
  */
 public final class TasteHadoopUtils {
 
+  public static final int USER_ID_POS = 0;
+  public static final int ITEM_ID_POS = 1;
+
   /** Standard delimiter of textual preference data */
   private static final Pattern PREFERENCE_TOKEN_DELIMITER = Pattern.compile("[\t,]");
 
-  private TasteHadoopUtils() {
-  }
+  private TasteHadoopUtils() {}
 
   /**
    * Splits a preference data line into string tokens
@@ -55,12 +57,18 @@ public final class TasteHadoopUtils {
     return 0x7FFFFFFF & Longs.hashCode(id);
   }
 
+  public static int readID(String token, boolean usesLongIDs) {
+    return usesLongIDs ?
+        TasteHadoopUtils.idToIndex(Long.parseLong(token))
+        : Integer.parseInt(token);
+  }
+
   /**
    * Reads a binary mapping file
    */
-  public static OpenIntLongHashMap readItemIDIndexMap(String itemIDIndexPathStr, Configuration conf) {
-    OpenIntLongHashMap indexItemIDMap = new OpenIntLongHashMap();
-    Path itemIDIndexPath = new Path(itemIDIndexPathStr);
+  public static OpenIntLongHashMap readIDIndexMap(String idIndexPathStr, Configuration conf) {
+    OpenIntLongHashMap indexIDMap = new OpenIntLongHashMap();
+    Path itemIDIndexPath = new Path(idIndexPathStr);
     for (Pair<VarIntWritable,VarLongWritable> record
          : new SequenceFileDirIterable<VarIntWritable,VarLongWritable>(itemIDIndexPath,
                                                                        PathType.LIST,
@@ -68,9 +76,9 @@ public final class TasteHadoopUtils {
                                                                        null,
                                                                        true,
                                                                        conf)) {
-      indexItemIDMap.put(record.getFirst().get(), record.getSecond().get());
+      indexIDMap.put(record.getFirst().get(), record.getSecond().get());
     }
-    return indexItemIDMap;
+    return indexIDMap;
   }
 
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java Sat Jun  8 07:28:24 2013
@@ -19,6 +19,7 @@ package org.apache.mahout.cf.taste.hadoo
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -75,6 +76,7 @@ public class FactorizationEvaluator exte
     addInputOption();
     addOption("userFeatures", null, "path to the user feature matrix", true);
     addOption("itemFeatures", null, "path to the item feature matrix", true);
+    addOption("usesLongIDs", null, "input contains long IDs that need to be translated");
     addOutputOption();
 
     Map<String,List<String>> parsedArgs = parseArguments(args);
@@ -87,8 +89,16 @@ public class FactorizationEvaluator exte
     Job predictRatings = prepareJob(getInputPath(), errors, TextInputFormat.class, PredictRatingsMapper.class,
         DoubleWritable.class, NullWritable.class, SequenceFileOutputFormat.class);
 
-    predictRatings.getConfiguration().set(USER_FEATURES_PATH, getOption("userFeatures"));
-    predictRatings.getConfiguration().set(ITEM_FEATURES_PATH, getOption("itemFeatures"));
+    Configuration conf = predictRatings.getConfiguration();
+    conf.set(USER_FEATURES_PATH, getOption("userFeatures"));
+    conf.set(ITEM_FEATURES_PATH, getOption("itemFeatures"));
+
+    boolean usesLongIDs = Boolean.parseBoolean(getOption("usesLongIDs"));
+    if (usesLongIDs) {
+      conf.set(ParallelALSFactorizationJob.USES_LONG_IDS, String.valueOf(true));
+    }
+
+
     boolean succeeded = predictRatings.waitForCompletion(true);
     if (!succeeded) {
       return -1;
@@ -125,27 +135,36 @@ public class FactorizationEvaluator exte
     private OpenIntObjectHashMap<Vector> U;
     private OpenIntObjectHashMap<Vector> M;
 
+    private boolean usesLongIDs;
+
+    private final DoubleWritable error = new DoubleWritable();
+
     @Override
     protected void setup(Context ctx) throws IOException, InterruptedException {
-      Path pathToU = new Path(ctx.getConfiguration().get(USER_FEATURES_PATH));
-      Path pathToM = new Path(ctx.getConfiguration().get(ITEM_FEATURES_PATH));
+      Configuration conf = ctx.getConfiguration();
+
+      Path pathToU = new Path(conf.get(USER_FEATURES_PATH));
+      Path pathToM = new Path(conf.get(ITEM_FEATURES_PATH));
 
-      U = ALS.readMatrixByRows(pathToU, ctx.getConfiguration());
-      M = ALS.readMatrixByRows(pathToM, ctx.getConfiguration());
+      U = ALS.readMatrixByRows(pathToU, conf);
+      M = ALS.readMatrixByRows(pathToM, conf);
+
+      usesLongIDs = conf.getBoolean(ParallelALSFactorizationJob.USES_LONG_IDS, false);
     }
 
     @Override
     protected void map(LongWritable key, Text value, Context ctx) throws IOException, InterruptedException {
 
       String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
-      int userID = Integer.parseInt(tokens[0]);
-      int itemID = Integer.parseInt(tokens[1]);
+
+      int userID = TasteHadoopUtils.readID(tokens[TasteHadoopUtils.USER_ID_POS], usesLongIDs);
+      int itemID = TasteHadoopUtils.readID(tokens[TasteHadoopUtils.ITEM_ID_POS], usesLongIDs);
       double rating = Double.parseDouble(tokens[2]);
 
       if (U.containsKey(userID) && M.containsKey(itemID)) {
         double estimate = U.get(userID).dot(M.get(itemID));
-        double err = rating - estimate;
-        ctx.write(new DoubleWritable(err), NullWritable.get());
+        error.set(rating - estimate);
+        ctx.write(error, NullWritable.get());
       }
     }
   }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java Sat Jun  8 07:28:24 2013
@@ -17,6 +17,7 @@
 
 package org.apache.mahout.cf.taste.hadoop.als;
 
+import com.google.common.base.Preconditions;
 import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -48,6 +49,8 @@ import org.apache.mahout.common.mapreduc
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 import org.slf4j.Logger;
@@ -88,12 +91,16 @@ public class ParallelALSFactorizationJob
   static final String ALPHA = ParallelALSFactorizationJob.class.getName() + ".alpha";
   static final String NUM_ENTITIES = ParallelALSFactorizationJob.class.getName() + ".numEntities";
 
+  static final String USES_LONG_IDS = ParallelALSFactorizationJob.class.getName() + ".usesLongIDs";
+  static final String TOKEN_POS = ParallelALSFactorizationJob.class.getName() + ".tokenPos";
+
   private boolean implicitFeedback;
   private int numIterations;
   private int numFeatures;
   private double lambda;
   private double alpha;
   private int numThreadsPerSolver;
+  private boolean usesLongIDs;
 
   private int numItems;
   private int numUsers;
@@ -115,6 +122,7 @@ public class ParallelALSFactorizationJob
     addOption("numFeatures", null, "dimension of the feature space", true);
     addOption("numIterations", null, "number of iterations", true);
     addOption("numThreadsPerSolver", null, "threads per solver mapper", String.valueOf(1));
+    addOption("usesLongIDs", null, "input contains long IDs that need to be translated");
 
     Map<String,List<String>> parsedArgs = parseArguments(args);
     if (parsedArgs == null) {
@@ -128,6 +136,7 @@ public class ParallelALSFactorizationJob
     implicitFeedback = Boolean.parseBoolean(getOption("implicitFeedback"));
 
     numThreadsPerSolver = Integer.parseInt(getOption("numThreadsPerSolver"));
+    usesLongIDs = Boolean.parseBoolean(getOption("usesLongIDs", String.valueOf(false)));
 
     /*
     * compute the factorization A = U M'
@@ -137,12 +146,27 @@ public class ParallelALSFactorizationJob
     *           M (items x features) is the representation of items in the feature space
     */
 
+    if (usesLongIDs) {
+      Job mapUsers = prepareJob(getInputPath(), getOutputPath("userIDIndex"), TextInputFormat.class,
+          MapLongIDsMapper.class, VarIntWritable.class, VarLongWritable.class, IDMapReducer.class,
+          VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class);
+      mapUsers.getConfiguration().set(TOKEN_POS, String.valueOf(TasteHadoopUtils.USER_ID_POS));
+      mapUsers.waitForCompletion(true);
+
+      Job mapItems = prepareJob(getInputPath(), getOutputPath("itemIDIndex"), TextInputFormat.class,
+          MapLongIDsMapper.class, VarIntWritable.class, VarLongWritable.class, IDMapReducer.class,
+          VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class);
+      mapItems.getConfiguration().set(TOKEN_POS, String.valueOf(TasteHadoopUtils.ITEM_ID_POS));
+      mapItems.waitForCompletion(true);
+    }
+
    /* create A' */
     Job itemRatings = prepareJob(getInputPath(), pathToItemRatings(),
         TextInputFormat.class, ItemRatingVectorsMapper.class, IntWritable.class,
         VectorWritable.class, VectorSumReducer.class, IntWritable.class,
         VectorWritable.class, SequenceFileOutputFormat.class);
     itemRatings.setCombinerClass(VectorSumCombiner.class);
+    itemRatings.getConfiguration().set(USES_LONG_IDS, String.valueOf(usesLongIDs));
     boolean succeeded = itemRatings.waitForCompletion(true);
     if (!succeeded) {
       return -1;
@@ -267,11 +291,18 @@ public class ParallelALSFactorizationJob
     private final VectorWritable ratingsWritable = new VectorWritable(true);
     private final Vector ratings = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
 
+    private boolean usesLongIDs;
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      usesLongIDs = ctx.getConfiguration().getBoolean(USES_LONG_IDS, false);
+    }
+
     @Override
     protected void map(LongWritable offset, Text line, Context ctx) throws IOException, InterruptedException {
       String[] tokens = TasteHadoopUtils.splitPrefTokens(line.toString());
-      int userID = Integer.parseInt(tokens[0]);
-      int itemID = Integer.parseInt(tokens[1]);
+      int userID = TasteHadoopUtils.readID(tokens[TasteHadoopUtils.USER_ID_POS], usesLongIDs);
+      int itemID = TasteHadoopUtils.readID(tokens[TasteHadoopUtils.ITEM_ID_POS], usesLongIDs);
       float rating = Float.parseFloat(tokens[2]);
 
       ratings.setQuick(userID, rating);
@@ -354,6 +385,38 @@ public class ParallelALSFactorizationJob
     }
   }
 
+  static class MapLongIDsMapper extends Mapper<LongWritable,Text,VarIntWritable,VarLongWritable> {
+
+    private int tokenPos;
+    private VarIntWritable index = new VarIntWritable();
+    private VarLongWritable idWritable = new VarLongWritable();
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException {
+      tokenPos = ctx.getConfiguration().getInt(TOKEN_POS, -1);
+      Preconditions.checkState(tokenPos >= 0);
+    }
+
+    @Override
+    protected void map(LongWritable key, Text line, Context ctx) throws IOException, InterruptedException {
+      String[] tokens = TasteHadoopUtils.splitPrefTokens(line.toString());
+
+      long id = Long.parseLong(tokens[tokenPos]);
+
+      index.set(TasteHadoopUtils.idToIndex(id));
+      idWritable.set(id);
+      ctx.write(index, idWritable);
+    }
+  }
+
+  static class IDMapReducer extends Reducer<VarIntWritable,VarLongWritable,VarIntWritable,VarLongWritable> {
+    @Override
+    protected void reduce(VarIntWritable index, Iterable<VarLongWritable> ids, Context ctx)
+        throws IOException, InterruptedException {
+      ctx.write(index, ids.iterator().next());
+    }
+  }
+
   private Path pathToM(int iteration) {
     return iteration == numIterations - 1 ? getOutputPath("M") : getTempPath("M-" + iteration);
   }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java Sat Jun  8 07:28:24 2013
@@ -17,16 +17,20 @@
 
 package org.apache.mahout.cf.taste.hadoop.als;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.mahout.cf.taste.hadoop.MutableRecommendedItem;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
 import org.apache.mahout.cf.taste.hadoop.TopItemsQueue;
 import org.apache.mahout.cf.taste.recommender.RecommendedItem;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.function.IntObjectProcedure;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
 import org.apache.mahout.math.map.OpenIntObjectHashMap;
 import org.apache.mahout.math.set.OpenIntHashSet;
 
@@ -37,34 +41,47 @@ import java.util.List;
  * a multithreaded mapper that loads the feature matrices U and M into memory. Afterwards it computes recommendations
  * from these. Can be executed by a {@link MultithreadedSharingMapper}.
  */
-public class PredictionMapper extends SharingMapper<IntWritable,VectorWritable,IntWritable,RecommendedItemsWritable,
+public class PredictionMapper extends SharingMapper<IntWritable,VectorWritable,LongWritable,RecommendedItemsWritable,
     Pair<OpenIntObjectHashMap<Vector>,OpenIntObjectHashMap<Vector>>> {
 
   private int recommendationsPerUser;
   private float maxRating;
 
+  private boolean usesLongIDs;
+  private OpenIntLongHashMap userIDIndex;
+  private OpenIntLongHashMap itemIDIndex;
+
+  private final LongWritable userIDWritable = new LongWritable();
   private final RecommendedItemsWritable recommendations = new RecommendedItemsWritable();
 
   @Override
   Pair<OpenIntObjectHashMap<Vector>, OpenIntObjectHashMap<Vector>> createSharedInstance(Context ctx) {
-    Path pathToU = new Path(ctx.getConfiguration().get(RecommenderJob.USER_FEATURES_PATH));
-    Path pathToM = new Path(ctx.getConfiguration().get(RecommenderJob.ITEM_FEATURES_PATH));
+    Configuration conf = ctx.getConfiguration();
+    Path pathToU = new Path(conf.get(RecommenderJob.USER_FEATURES_PATH));
+    Path pathToM = new Path(conf.get(RecommenderJob.ITEM_FEATURES_PATH));
 
-    OpenIntObjectHashMap<Vector> U = ALS.readMatrixByRows(pathToU, ctx.getConfiguration());
-    OpenIntObjectHashMap<Vector> M = ALS.readMatrixByRows(pathToM, ctx.getConfiguration());
+    OpenIntObjectHashMap<Vector> U = ALS.readMatrixByRows(pathToU, conf);
+    OpenIntObjectHashMap<Vector> M = ALS.readMatrixByRows(pathToM, conf);
 
     return new Pair<OpenIntObjectHashMap<Vector>, OpenIntObjectHashMap<Vector>>(U, M);
   }
 
   @Override
   protected void setup(Context ctx) throws IOException, InterruptedException {
-    recommendationsPerUser = ctx.getConfiguration().getInt(RecommenderJob.NUM_RECOMMENDATIONS,
-      RecommenderJob.DEFAULT_NUM_RECOMMENDATIONS);
-    maxRating = Float.parseFloat(ctx.getConfiguration().get(RecommenderJob.MAX_RATING));
+    Configuration conf = ctx.getConfiguration();
+    recommendationsPerUser = conf.getInt(RecommenderJob.NUM_RECOMMENDATIONS,
+        RecommenderJob.DEFAULT_NUM_RECOMMENDATIONS);
+    maxRating = Float.parseFloat(conf.get(RecommenderJob.MAX_RATING));
+
+    usesLongIDs = conf.getBoolean(ParallelALSFactorizationJob.USES_LONG_IDS, false);
+    if (usesLongIDs) {
+      userIDIndex = TasteHadoopUtils.readIDIndexMap(conf.get(RecommenderJob.USER_INDEX_PATH), conf);
+      itemIDIndex = TasteHadoopUtils.readIDIndexMap(conf.get(RecommenderJob.ITEM_INDEX_PATH), conf);
+    }
   }
 
   @Override
-  protected void map(IntWritable userIDWritable, VectorWritable ratingsWritable, Context ctx)
+  protected void map(IntWritable userIndexWritable, VectorWritable ratingsWritable, Context ctx)
     throws IOException, InterruptedException {
 
     Pair<OpenIntObjectHashMap<Vector>, OpenIntObjectHashMap<Vector>> uAndM = getSharedInstance();
@@ -72,7 +89,7 @@ public class PredictionMapper extends Sh
     OpenIntObjectHashMap<Vector> M = uAndM.getSecond();
 
     Vector ratings = ratingsWritable.get();
-    final int userID = userIDWritable.get();
+    final int userIndex = userIndexWritable.get();
     final OpenIntHashSet alreadyRatedItems = new OpenIntHashSet(ratings.getNumNondefaultElements());
 
     for (Vector.Element e : ratings.nonZeroes()) {
@@ -80,7 +97,7 @@ public class PredictionMapper extends Sh
     }
 
     final TopItemsQueue topItemsQueue = new TopItemsQueue(recommendationsPerUser);
-    final Vector userFeatures = U.get(userID);
+    final Vector userFeatures = U.get(userIndex);
 
     M.forEachPair(new IntObjectProcedure<Vector>() {
       @Override
@@ -107,6 +124,20 @@ public class PredictionMapper extends Sh
         ((MutableRecommendedItem) topItem).capToMaxValue(maxRating);
       }
 
+      if (usesLongIDs) {
+        long userID = userIDIndex.get(userIndex);
+        userIDWritable.set(userID);
+
+        for (RecommendedItem topItem : recommendedItems) {
+          // remap item IDs
+          long itemID = itemIDIndex.get((int) topItem.getItemID());
+          ((MutableRecommendedItem) topItem).setItemID(itemID);
+        }
+
+      } else {
+        userIDWritable.set(userIndex);
+      }
+
       recommendations.set(recommendedItems);
       ctx.write(userIDWritable, recommendations);
     }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/RecommenderJob.java Sat Jun  8 07:28:24 2013
@@ -49,6 +49,8 @@ public class RecommenderJob extends Abst
   static final String USER_FEATURES_PATH = RecommenderJob.class.getName() + ".userFeatures";
   static final String ITEM_FEATURES_PATH = RecommenderJob.class.getName() + ".itemFeatures";
   static final String MAX_RATING = RecommenderJob.class.getName() + ".maxRating";
+  static final String USER_INDEX_PATH = RecommenderJob.class.getName() + ".userIndex";
+  static final String ITEM_INDEX_PATH = RecommenderJob.class.getName() + ".itemIndex";
 
   static final int DEFAULT_NUM_RECOMMENDATIONS = 10;
 
@@ -66,6 +68,9 @@ public class RecommenderJob extends Abst
         String.valueOf(DEFAULT_NUM_RECOMMENDATIONS));
     addOption("maxRating", null, "maximum rating available", true);
     addOption("numThreads", null, "threads per mapper", String.valueOf(1));
+    addOption("usesLongIDs", null, "input contains long IDs that need to be translated");
+    addOption("userIDIndex", null, "index for user long IDs (necessary if usesLongIDs is true)");
+    addOption("itemIDIndex", null, "index for user long IDs (necessary if usesLongIDs is true)");
     addOutputOption();
 
     Map<String,List<String>> parsedArgs = parseArguments(args);
@@ -84,6 +89,13 @@ public class RecommenderJob extends Abst
     conf.set(ITEM_FEATURES_PATH, getOption("itemFeatures"));
     conf.set(MAX_RATING, getOption("maxRating"));
 
+    boolean usesLongIDs = Boolean.parseBoolean(getOption("usesLongIDs"));
+    if (usesLongIDs) {
+      conf.set(ParallelALSFactorizationJob.USES_LONG_IDS, String.valueOf(true));
+      conf.set(USER_INDEX_PATH, getOption("userIDIndex"));
+      conf.set(ITEM_INDEX_PATH, getOption("itemIDIndex"));
+    }
+
     MultithreadedMapper.setMapperClass(prediction, PredictionMapper.class);
     MultithreadedMapper.setNumberOfThreads(prediction, numThreads);
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Sat Jun  8 07:28:24 2013
@@ -76,7 +76,7 @@ public final class AggregateAndRecommend
     Configuration conf = context.getConfiguration();
     recommendationsPerUser = conf.getInt(NUM_RECOMMENDATIONS, DEFAULT_NUM_RECOMMENDATIONS);
     booleanData = conf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
-    indexItemIDMap = TasteHadoopUtils.readItemIDIndexMap(conf.get(ITEMID_INDEX_PATH), conf);
+    indexItemIDMap = TasteHadoopUtils.readIDIndexMap(conf.get(ITEMID_INDEX_PATH), conf);
 
     String itemFilePathString = conf.get(ITEMS_FILE);
     if (itemFilePathString != null) {

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java Sat Jun  8 07:28:24 2013
@@ -184,7 +184,7 @@ public final class ItemSimilarityJob ext
     protected void setup(Context ctx) {
       Configuration conf = ctx.getConfiguration();
       maxSimilarItemsPerItem = conf.getInt(MAX_SIMILARITIES_PER_ITEM, -1);
-      indexItemIDMap = TasteHadoopUtils.readItemIDIndexMap(conf.get(ITEM_ID_INDEX_PATH_STR), conf);
+      indexItemIDMap = TasteHadoopUtils.readIDIndexMap(conf.get(ITEM_ID_INDEX_PATH_STR), conf);
 
       Preconditions.checkArgument(maxSimilarItemsPerItem > 0, "maxSimilarItemsPerItem was not correctly set!");
     }

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJobTest.java Sat Jun  8 07:28:24 2013
@@ -19,6 +19,7 @@ package org.apache.mahout.cf.taste.hadoo
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
 import org.apache.mahout.cf.taste.impl.TasteTestCase;
 import org.apache.mahout.cf.taste.impl.common.FullRunningAverage;
 import org.apache.mahout.cf.taste.impl.common.RunningAverage;
@@ -29,6 +30,8 @@ import org.apache.mahout.math.SparseRowM
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.Vector.Element;
 import org.apache.mahout.math.hadoop.MathHelper;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
+import org.apache.mahout.math.map.OpenIntObjectHashMap;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -41,6 +44,7 @@ public class ParallelALSFactorizationJob
   private static final Logger log = LoggerFactory.getLogger(ParallelALSFactorizationJobTest.class);
 
   private File inputFile;
+  private File intermediateDir;
   private File outputDir;
   private File tmpDir;
   private Configuration conf;
@@ -50,6 +54,8 @@ public class ParallelALSFactorizationJob
   public void setUp() throws Exception {
     super.setUp();
     inputFile = getTestTempFile("prefs.txt");
+    intermediateDir = getTestTempDir("intermediate");
+    intermediateDir.delete();
     outputDir = getTestTempDir("output");
     outputDir.delete();
     tmpDir = getTestTempDir("tmp");
@@ -225,6 +231,74 @@ public class ParallelALSFactorizationJob
     assertTrue(rmse < 0.4);
   }
 
+  @Test
+  public void exampleWithIDMapping() throws Exception {
+
+    String[] preferencesWithLongIDs = new String[] {
+        "5568227754922264005,-4758971626494767444,5.0",
+        "5568227754922264005,3688396615879561990,5.0",
+        "5568227754922264005,4594226737871995304,2.0",
+        "550945997885173934,-4758971626494767444,2.0",
+        "550945997885173934,4594226737871995304,3.0",
+        "550945997885173934,706816485922781596,5.0",
+        "2448095297482319463,3688396615879561990,5.0",
+        "2448095297482319463,706816485922781596,3.0",
+        "6839920411763636962,-4758971626494767444,3.0",
+        "6839920411763636962,706816485922781596,5.0" };
+
+    writeLines(inputFile, preferencesWithLongIDs);
+
+    ParallelALSFactorizationJob alsFactorization = new ParallelALSFactorizationJob();
+    alsFactorization.setConf(conf);
+
+    int numFeatures = 3;
+    int numIterations = 5;
+    double lambda = 0.065;
+
+    alsFactorization.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(),
+        "--tempDir", tmpDir.getAbsolutePath(), "--lambda", String.valueOf(lambda),
+        "--numFeatures", String.valueOf(numFeatures), "--numIterations", String.valueOf(numIterations),
+        "--numThreadsPerSolver", String.valueOf(1), "--usesLongIDs", String.valueOf(true) });
+
+
+    OpenIntLongHashMap userIDIndex =
+        TasteHadoopUtils.readIDIndexMap(outputDir.getAbsolutePath() + "/userIDIndex/part-r-00000", conf);
+    assertEquals(4, userIDIndex.size());
+
+    OpenIntLongHashMap itemIDIndex =
+        TasteHadoopUtils.readIDIndexMap(outputDir.getAbsolutePath() + "/itemIDIndex/part-r-00000", conf);
+    assertEquals(4, itemIDIndex.size());
+
+    OpenIntObjectHashMap<Vector> u =
+        MathHelper.readMatrixRows(conf, new Path(outputDir.getAbsolutePath(), "U/part-m-00000"));
+    OpenIntObjectHashMap<Vector> m =
+        MathHelper.readMatrixRows(conf, new Path(outputDir.getAbsolutePath(), "M/part-m-00000"));
+
+    assertEquals(4, u.size());
+    assertEquals(4, m.size());
+
+    RunningAverage avg = new FullRunningAverage();
+    for (String line : preferencesWithLongIDs) {
+      String[] tokens = TasteHadoopUtils.splitPrefTokens(line);
+      long userID = Long.parseLong(tokens[TasteHadoopUtils.USER_ID_POS]);
+      long itemID = Long.parseLong(tokens[TasteHadoopUtils.ITEM_ID_POS]);
+      double rating = Double.parseDouble(tokens[2]);
+
+      Vector userFeatures = u.get(TasteHadoopUtils.idToIndex(userID));
+      Vector itemFeatures = m.get(TasteHadoopUtils.idToIndex(itemID));
+
+      double estimate = userFeatures.dot(itemFeatures);
+
+      double err = rating - estimate;
+      avg.addDatum(err * err);
+    }
+
+    double rmse = Math.sqrt(avg.getAverage());
+    log.info("RMSE: {}", rmse);
+
+    assertTrue(rmse < 0.2);
+  }
+
   protected static String preferencesAsText(Matrix preferences) {
     StringBuilder prefsAsText = new StringBuilder();
     String separator = "";
@@ -237,8 +311,64 @@ public class ParallelALSFactorizationJob
         }
       }
     }
+    System.out.println(prefsAsText.toString());
     return prefsAsText.toString();
   }
 
+  @Test
+  public void recommenderJobWithIDMapping() throws Exception {
+
+    String[] preferencesWithLongIDs = new String[] {
+        "5568227754922264005,-4758971626494767444,5.0",
+        "5568227754922264005,3688396615879561990,5.0",
+        "5568227754922264005,4594226737871995304,2.0",
+        "550945997885173934,-4758971626494767444,2.0",
+        "550945997885173934,4594226737871995304,3.0",
+        "550945997885173934,706816485922781596,5.0",
+        "2448095297482319463,3688396615879561990,5.0",
+        "2448095297482319463,706816485922781596,3.0",
+        "6839920411763636962,-4758971626494767444,3.0",
+        "6839920411763636962,706816485922781596,5.0" };
+
+    writeLines(inputFile, preferencesWithLongIDs);
+
+    ParallelALSFactorizationJob alsFactorization = new ParallelALSFactorizationJob();
+    alsFactorization.setConf(conf);
+
+    int numFeatures = 3;
+    int numIterations = 5;
+    double lambda = 0.065;
+
+    int success = alsFactorization.run(new String[] {
+        "--input", inputFile.getAbsolutePath(),
+        "--output", intermediateDir.getAbsolutePath(),
+        "--tempDir", tmpDir.getAbsolutePath(),
+        "--lambda", String.valueOf(lambda),
+        "--numFeatures", String.valueOf(numFeatures),
+        "--numIterations", String.valueOf(numIterations),
+        "--numThreadsPerSolver", String.valueOf(1),
+        "--usesLongIDs", String.valueOf(true) });
+
+    assertEquals(success, 0);
+
+    // reset as we run in the same JVM
+    SharingMapper.reset();
+
+    RecommenderJob recommender = new RecommenderJob();
+
+    success = recommender.run(new String[] {
+        "--input", intermediateDir.getAbsolutePath() + "/userRatings/",
+        "--userFeatures", intermediateDir.getAbsolutePath() + "/U/",
+        "--itemFeatures", intermediateDir.getAbsolutePath() + "/M/",
+        "--numRecommendations", String.valueOf(2),
+        "--maxRating", String.valueOf(5.0),
+        "--numThreads", String.valueOf(2),
+        "--usesLongIDs", String.valueOf(true),
+        "--userIDIndex", intermediateDir.getAbsolutePath() + "/userIDIndex/",
+        "--itemIDIndex", intermediateDir.getAbsolutePath() + "/itemIDIndex/",
+        "--output", outputDir.getAbsolutePath() });
+
+    assertEquals(0, success);
+  }
 
 }

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java?rev=1490930&r1=1490929&r2=1490930&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java Sat Jun  8 07:28:24 2013
@@ -39,6 +39,7 @@ import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.Vector.Element;
 import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
+import org.apache.mahout.math.map.OpenIntObjectHashMap;
 import org.easymock.IArgumentMatcher;
 import org.easymock.EasyMock;
 import org.junit.Assert;
@@ -51,43 +52,6 @@ public final class MathHelper {
   private MathHelper() {}
 
   /**
-   * applies an {@link IArgumentMatcher} to {@link MatrixEntryWritable}s
-   */
-  public static MatrixEntryWritable matrixEntryMatches(final int row, final int col, final double value) {
-    EasyMock.reportMatcher(new IArgumentMatcher() {
-      @Override
-      public boolean matches(Object argument) {
-        if (argument instanceof MatrixEntryWritable) {
-          MatrixEntryWritable entry = (MatrixEntryWritable) argument;
-          return row == entry.getRow()
-              && col == entry.getCol()
-              && Math.abs(value - entry.getVal()) <= MahoutTestCase.EPSILON;
-        }
-        return false;
-      }
-
-      @Override
-      public void appendTo(StringBuffer buffer) {
-        buffer.append("MatrixEntry[row=").append(row)
-            .append(",col=").append(col)
-            .append(",value=").append(value).append(']');
-      }
-    });
-    return null;
-  }
-
-  /**
-   * convenience method to create a {@link MatrixEntryWritable}
-   */
-  public static MatrixEntryWritable matrixEntry(int row, int col, double value) {
-    MatrixEntryWritable entry = new MatrixEntryWritable();
-    entry.setRow(row);
-    entry.setCol(col);
-    entry.setVal(value);
-    return entry;
-  }
-
-  /**
    * convenience method to create a {@link Vector.Element}
    */
   public static Vector.Element elem(int index, double value) {
@@ -190,6 +154,26 @@ public final class MathHelper {
   }
 
   /**
+   * read a {@link Matrix} from a SequenceFile<IntWritable,VectorWritable>
+   */
+  public static OpenIntObjectHashMap<Vector> readMatrixRows(Configuration conf, Path path) {
+    boolean readOneRow = false;
+    OpenIntObjectHashMap<Vector> rows = new OpenIntObjectHashMap<Vector>();
+    for (Pair<IntWritable,VectorWritable> record :
+        new SequenceFileIterable<IntWritable,VectorWritable>(path, true, conf)) {
+      IntWritable key = record.getFirst();
+      VectorWritable value = record.getSecond();
+      readOneRow = true;
+      int row = key.get();
+      rows.put(key.get(), record.getSecond().get());
+    }
+    if (!readOneRow) {
+      throw new IllegalStateException("Not a single row read!");
+    }
+    return rows;
+  }
+
+  /**
    * write a two-dimensional double array to an SequenceFile<IntWritable,VectorWritable>
    */
   public static void writeDistributedRowMatrix(double[][] entries, FileSystem fs, Configuration conf, Path path)