You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dr...@apache.org on 2010/08/10 15:11:06 UTC

svn commit: r983982 - in /mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop: ./ item/ similarity/item/

Author: drew
Date: Tue Aug 10 13:11:06 2010
New Revision: 983982

URL: http://svn.apache.org/viewvc?rev=983982&view=rev
Log:
MAHOUT-457: ItemSimilarityJob and RecommenderJob don't work on Amazon ElasticMapReduce (via Sebastian Schelter)

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java?rev=983982&r1=983981&r2=983982&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java Tue Aug 10 13:11:06 2010
@@ -17,17 +17,22 @@
 
 package org.apache.mahout.cf.taste.hadoop;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.regex.Pattern;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.regex.Pattern;
 
 /**
  * some helper methods for the hadoop-related stuff in org.apache.mahout.cf.taste
@@ -67,7 +72,37 @@ public final class TasteHadoopUtils {
   public static int idToIndex(long id) {
     return 0x7FFFFFFF & ((int) id ^ (int) (id >>> 32));
   }
-  
+
+  /**
+   * reads a binary mapping file
+   * 
+   * @param itemIDIndexPathStr
+   * @param conf
+   * @return
+   */
+  public static OpenIntLongHashMap readItemIDIndexMap(String itemIDIndexPathStr, Configuration conf) {
+    OpenIntLongHashMap indexItemIDMap = new OpenIntLongHashMap();
+    try {
+      Path unqualifiedItemIDIndexPath = new Path(itemIDIndexPathStr);
+      FileSystem fs = FileSystem.get(unqualifiedItemIDIndexPath.toUri(), conf);
+      Path itemIDIndexPath = new Path(itemIDIndexPathStr).makeQualified(fs);
+
+      VarIntWritable index = new VarIntWritable();
+      VarLongWritable id = new VarLongWritable();
+      for (FileStatus status : fs.listStatus(itemIDIndexPath, PARTS_FILTER)) {
+        String path = status.getPath().toString();
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(path).makeQualified(fs), conf);
+        while (reader.next(index, id)) {
+          indexItemIDMap.put(index.get(), id.get());
+        }
+        reader.close();
+      }
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+    return indexItemIDMap;
+  }
+
   /**
    * reads a text-based outputfile that only contains an int
    * 
@@ -77,7 +112,7 @@ public final class TasteHadoopUtils {
    * @throws IOException
    */
   public static int readIntFromFile(Configuration conf, Path outputDir) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
+    FileSystem fs = FileSystem.get(outputDir.toUri(), conf);
     Path outputFile = fs.listStatus(outputDir, TasteHadoopUtils.PARTS_FILTER)[0].getPath();
     InputStream in = null;
     try  {

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=983982&r1=983981&r2=983982&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Tue Aug 10 13:11:06 2010
@@ -17,20 +17,11 @@
 
 package org.apache.mahout.cf.taste.hadoop.item;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Queue;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
 import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
@@ -40,13 +31,15 @@ import org.apache.mahout.cf.taste.impl.r
 import org.apache.mahout.cf.taste.recommender.RecommendedItem;
 import org.apache.mahout.common.FileLineIterable;
 import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.Vector.Element;
 import org.apache.mahout.math.function.UnaryFunction;
 import org.apache.mahout.math.map.OpenIntLongHashMap;
 
+import java.io.IOException;
+import java.util.*;
+
 /**
  * <p>computes prediction values for each user</p>
  *
@@ -77,40 +70,27 @@ public final class AggregateAndRecommend
     Configuration jobConf = context.getConfiguration();
     recommendationsPerUser = jobConf.getInt(NUM_RECOMMENDATIONS, DEFAULT_NUM_RECOMMENDATIONS);
     booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
-    try {
-      FileSystem fs = FileSystem.get(jobConf);
-      Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
-      indexItemIDMap = new OpenIntLongHashMap();
-      VarIntWritable index = new VarIntWritable();
-      VarLongWritable id = new VarLongWritable();
-      for (FileStatus status : fs.listStatus(itemIDIndexPath, TasteHadoopUtils.PARTS_FILTER)) {
-        String path = status.getPath().toString();
-        SequenceFile.Reader reader =
-            new SequenceFile.Reader(fs, new Path(path).makeQualified(fs), jobConf);
-        while (reader.next(index, id)) {
-          indexItemIDMap.put(index.get(), id.get());
-        }
-        reader.close();
-      }
-    } catch (IOException ioe) {
-      throw new IllegalStateException(ioe);
-    }
+    indexItemIDMap = TasteHadoopUtils.readItemIDIndexMap(jobConf.get(ITEMID_INDEX_PATH), jobConf);
 
+    FSDataInputStream in = null;
     try {
-        FileSystem fs = FileSystem.get(jobConf);
-        String itemFilePathString = jobConf.get(ITEMS_FILE);
+        String itemFilePathString = jobConf.get(ITEMS_FILE);        
         if (itemFilePathString == null) {
-        	itemsToRecommendFor = null;
+          itemsToRecommendFor = null;
         } else {
-        	itemsToRecommendFor = new FastIDSet();
-          Path usersFilePath = new Path(itemFilePathString).makeQualified(fs);
-          FSDataInputStream in = fs.open(usersFilePath);
+          Path unqualifiedItemsFilePath = new Path(itemFilePathString);
+          FileSystem fs = FileSystem.get(unqualifiedItemsFilePath.toUri(), jobConf);
+          itemsToRecommendFor = new FastIDSet();
+          Path itemsFilePath = unqualifiedItemsFilePath.makeQualified(fs);
+          in = fs.open(itemsFilePath);
           for (String line : new FileLineIterable(in)) {
         	  itemsToRecommendFor.add(Long.parseLong(line));
           }
         }
       } catch (IOException ioe) {
         throw new IllegalStateException(ioe);
+      } finally {
+        IOUtils.closeStream(in);
       }
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=983982&r1=983981&r2=983982&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Tue Aug 10 13:11:06 2010
@@ -17,13 +17,8 @@
 
 package org.apache.mahout.cf.taste.hadoop.item;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -40,11 +35,7 @@ import org.apache.mahout.cf.taste.hadoop
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
 import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
 import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersKeyWritable;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersMapper;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersReducer;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.PrefsToItemUserMatrixMapper;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.PrefsToItemUserMatrixReducer;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.*;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
@@ -52,6 +43,12 @@ import org.apache.mahout.math.VectorWrit
 import org.apache.mahout.math.hadoop.DistributedRowMatrix;
 import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 /**
  * <p>Runs a completely distributed recommender job as a series of mapreduces.</p>
  *
@@ -238,6 +235,13 @@ public final class RecommenderJob extend
         Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
         ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
         SequenceFileOutputFormat.class);
+
+      /* necessary to make this job (having a combined input path) work on Amazon S3 */
+      Configuration partialMultiplyConf = partialMultiply.getConfiguration();
+      FileSystem fs = FileSystem.get(tempDirPath.toUri(), partialMultiplyConf);
+      prePartialMultiplyPath1 = prePartialMultiplyPath1.makeQualified(fs);
+      prePartialMultiplyPath2 = prePartialMultiplyPath2.makeQualified(fs);
+      SequenceFileInputFormat.setInputPaths(partialMultiply, prePartialMultiplyPath1, prePartialMultiplyPath2);
       partialMultiply.waitForCompletion(true);
     }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java?rev=983982&r1=983981&r2=983982&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java Tue Aug 10 13:11:06 2010
@@ -17,14 +17,11 @@
 
 package org.apache.mahout.cf.taste.hadoop.item;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.cf.taste.impl.common.FastIDSet;
 import org.apache.mahout.common.FileLineIterable;
@@ -33,6 +30,10 @@ import org.apache.mahout.math.VarLongWri
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
 public final class UserVectorSplitterMapper extends
     Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
 
@@ -46,23 +47,24 @@ public final class UserVectorSplitterMap
   @Override
   protected void setup(Context context) {
     Configuration jobConf = context.getConfiguration();
-    maxPrefsPerUserConsidered = jobConf.getInt(MAX_PREFS_PER_USER_CONSIDERED,
-                                               DEFAULT_MAX_PREFS_PER_USER_CONSIDERED);
-    try {
-      FileSystem fs = FileSystem.get(jobConf);
-      String usersFilePathString = jobConf.get(USERS_FILE);
-      if (usersFilePathString == null) {
-        usersToRecommendFor = null;
-      } else {
+    maxPrefsPerUserConsidered = jobConf.getInt(MAX_PREFS_PER_USER_CONSIDERED, DEFAULT_MAX_PREFS_PER_USER_CONSIDERED);
+    String usersFilePathString = jobConf.get(USERS_FILE);
+    if (usersFilePathString != null) {
+      FSDataInputStream in = null;
+      try {
+        Path unqualifiedUsersFilePath = new Path(usersFilePathString);
+        FileSystem fs = FileSystem.get(unqualifiedUsersFilePath.toUri(), jobConf);
         usersToRecommendFor = new FastIDSet();
-        Path usersFilePath = new Path(usersFilePathString).makeQualified(fs);
-        FSDataInputStream in = fs.open(usersFilePath);
+        Path usersFilePath = unqualifiedUsersFilePath.makeQualified(fs);
+        in = fs.open(usersFilePath);
         for (String line : new FileLineIterable(in)) {
           usersToRecommendFor.add(Long.parseLong(line));
-        }
+        }     
+      } catch (IOException ioe) {
+        throw new IllegalStateException(ioe);
+      } finally {
+        IOUtils.closeStream(in);
       }
-    } catch (IOException ioe) {
-      throw new IllegalStateException(ioe);
     }
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java?rev=983982&r1=983981&r2=983982&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java Tue Aug 10 13:11:06 2010
@@ -17,31 +17,19 @@
 
 package org.apache.mahout.cf.taste.hadoop.similarity.item;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Queue;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
 import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
-import org.apache.mahout.math.VarIntWritable;
-import org.apache.mahout.math.VarLongWritable;
-import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.map.OpenIntLongHashMap;
 
+import java.io.IOException;
+import java.util.*;
+
 public class MostSimilarItemPairsMapper
     extends Mapper<IntWritable,VectorWritable,EntityEntityWritable,DoubleWritable> {
 
@@ -56,25 +44,7 @@ public class MostSimilarItemPairsMapper
     if (maxSimilarItemsPerItem < 1) {
       throw new IllegalStateException("maxSimilarItemsPerItem was not correctly set!");
     }
-
-    try {
-      FileSystem fs = FileSystem.get(conf);
-      Path itemIDIndexPath = new Path(itemIDIndexPathStr).makeQualified(fs);
-      indexItemIDMap = new OpenIntLongHashMap();
-      VarIntWritable index = new VarIntWritable();
-      VarLongWritable id = new VarLongWritable();
-      for (FileStatus status : fs.listStatus(itemIDIndexPath, TasteHadoopUtils.PARTS_FILTER)) {
-        String path = status.getPath().toString();
-        SequenceFile.Reader reader =
-            new SequenceFile.Reader(fs, new Path(path).makeQualified(fs), conf);
-        while (reader.next(index, id)) {
-          indexItemIDMap.put(index.get(), id.get());
-        }
-        reader.close();
-      }
-    } catch (IOException ioe) {
-      throw new IllegalStateException(ioe);
-    }
+    indexItemIDMap = TasteHadoopUtils.readItemIDIndexMap(conf.get(ItemSimilarityJob.ITEM_ID_INDEX_PATH_STR), conf);
   }
 
   @Override