You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dr...@apache.org on 2010/08/10 15:11:06 UTC
svn commit: r983982 - in
/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop: ./
item/ similarity/item/
Author: drew
Date: Tue Aug 10 13:11:06 2010
New Revision: 983982
URL: http://svn.apache.org/viewvc?rev=983982&view=rev
Log:
MAHOUT-457: ItemSimilarityJob and RecommenderJob don't work on Amazon ElasticMapReduce (via Sebastian Schelter)
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java?rev=983982&r1=983981&r2=983982&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java Tue Aug 10 13:11:06 2010
@@ -17,17 +17,22 @@
package org.apache.mahout.cf.taste.hadoop;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.regex.Pattern;
-
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.regex.Pattern;
/**
* some helper methods for the hadoop-related stuff in org.apache.mahout.cf.taste
@@ -67,7 +72,37 @@ public final class TasteHadoopUtils {
public static int idToIndex(long id) {
return 0x7FFFFFFF & ((int) id ^ (int) (id >>> 32));
}
-
+
+ /**
+ * reads a binary mapping file
+ *
+ * @param itemIDIndexPathStr
+ * @param conf
+ * @return
+ */
+ public static OpenIntLongHashMap readItemIDIndexMap(String itemIDIndexPathStr, Configuration conf) {
+ OpenIntLongHashMap indexItemIDMap = new OpenIntLongHashMap();
+ try {
+ Path unqualifiedItemIDIndexPath = new Path(itemIDIndexPathStr);
+ FileSystem fs = FileSystem.get(unqualifiedItemIDIndexPath.toUri(), conf);
+ Path itemIDIndexPath = new Path(itemIDIndexPathStr).makeQualified(fs);
+
+ VarIntWritable index = new VarIntWritable();
+ VarLongWritable id = new VarLongWritable();
+ for (FileStatus status : fs.listStatus(itemIDIndexPath, PARTS_FILTER)) {
+ String path = status.getPath().toString();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(path).makeQualified(fs), conf);
+ while (reader.next(index, id)) {
+ indexItemIDMap.put(index.get(), id.get());
+ }
+ reader.close();
+ }
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+ return indexItemIDMap;
+ }
+
/**
* reads a text-based outputfile that only contains an int
*
@@ -77,7 +112,7 @@ public final class TasteHadoopUtils {
* @throws IOException
*/
public static int readIntFromFile(Configuration conf, Path outputDir) throws IOException {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.get(outputDir.toUri(), conf);
Path outputFile = fs.listStatus(outputDir, TasteHadoopUtils.PARTS_FILTER)[0].getPath();
InputStream in = null;
try {
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=983982&r1=983981&r2=983982&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Tue Aug 10 13:11:06 2010
@@ -17,20 +17,11 @@
package org.apache.mahout.cf.taste.hadoop.item;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Queue;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
@@ -40,13 +31,15 @@ import org.apache.mahout.cf.taste.impl.r
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.common.FileLineIterable;
import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
import org.apache.mahout.math.function.UnaryFunction;
import org.apache.mahout.math.map.OpenIntLongHashMap;
+import java.io.IOException;
+import java.util.*;
+
/**
* <p>computes prediction values for each user</p>
*
@@ -77,40 +70,27 @@ public final class AggregateAndRecommend
Configuration jobConf = context.getConfiguration();
recommendationsPerUser = jobConf.getInt(NUM_RECOMMENDATIONS, DEFAULT_NUM_RECOMMENDATIONS);
booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
- try {
- FileSystem fs = FileSystem.get(jobConf);
- Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
- indexItemIDMap = new OpenIntLongHashMap();
- VarIntWritable index = new VarIntWritable();
- VarLongWritable id = new VarLongWritable();
- for (FileStatus status : fs.listStatus(itemIDIndexPath, TasteHadoopUtils.PARTS_FILTER)) {
- String path = status.getPath().toString();
- SequenceFile.Reader reader =
- new SequenceFile.Reader(fs, new Path(path).makeQualified(fs), jobConf);
- while (reader.next(index, id)) {
- indexItemIDMap.put(index.get(), id.get());
- }
- reader.close();
- }
- } catch (IOException ioe) {
- throw new IllegalStateException(ioe);
- }
+ indexItemIDMap = TasteHadoopUtils.readItemIDIndexMap(jobConf.get(ITEMID_INDEX_PATH), jobConf);
+ FSDataInputStream in = null;
try {
- FileSystem fs = FileSystem.get(jobConf);
- String itemFilePathString = jobConf.get(ITEMS_FILE);
+ String itemFilePathString = jobConf.get(ITEMS_FILE);
if (itemFilePathString == null) {
- itemsToRecommendFor = null;
+ itemsToRecommendFor = null;
} else {
- itemsToRecommendFor = new FastIDSet();
- Path usersFilePath = new Path(itemFilePathString).makeQualified(fs);
- FSDataInputStream in = fs.open(usersFilePath);
+ Path unqualifiedItemsFilePath = new Path(itemFilePathString);
+ FileSystem fs = FileSystem.get(unqualifiedItemsFilePath.toUri(), jobConf);
+ itemsToRecommendFor = new FastIDSet();
+ Path itemsFilePath = unqualifiedItemsFilePath.makeQualified(fs);
+ in = fs.open(itemsFilePath);
for (String line : new FileLineIterable(in)) {
itemsToRecommendFor.add(Long.parseLong(line));
}
}
} catch (IOException ioe) {
throw new IllegalStateException(ioe);
+ } finally {
+ IOUtils.closeStream(in);
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=983982&r1=983981&r2=983982&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Tue Aug 10 13:11:06 2010
@@ -17,13 +17,8 @@
package org.apache.mahout.cf.taste.hadoop.item;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
@@ -40,11 +35,7 @@ import org.apache.mahout.cf.taste.hadoop
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersKeyWritable;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersMapper;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersReducer;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.PrefsToItemUserMatrixMapper;
-import org.apache.mahout.cf.taste.hadoop.similarity.item.PrefsToItemUserMatrixReducer;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.*;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
@@ -52,6 +43,12 @@ import org.apache.mahout.math.VectorWrit
import org.apache.mahout.math.hadoop.DistributedRowMatrix;
import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
/**
* <p>Runs a completely distributed recommender job as a series of mapreduces.</p>
*
@@ -238,6 +235,13 @@ public final class RecommenderJob extend
Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
SequenceFileOutputFormat.class);
+
+ /* necessary to make this job (having a combined input path) work on Amazon S3 */
+ Configuration partialMultiplyConf = partialMultiply.getConfiguration();
+ FileSystem fs = FileSystem.get(tempDirPath.toUri(), partialMultiplyConf);
+ prePartialMultiplyPath1 = prePartialMultiplyPath1.makeQualified(fs);
+ prePartialMultiplyPath2 = prePartialMultiplyPath2.makeQualified(fs);
+ SequenceFileInputFormat.setInputPaths(partialMultiply, prePartialMultiplyPath1, prePartialMultiplyPath2);
partialMultiply.waitForCompletion(true);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java?rev=983982&r1=983981&r2=983982&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java Tue Aug 10 13:11:06 2010
@@ -17,14 +17,11 @@
package org.apache.mahout.cf.taste.hadoop.item;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.impl.common.FastIDSet;
import org.apache.mahout.common.FileLineIterable;
@@ -33,6 +30,10 @@ import org.apache.mahout.math.VarLongWri
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
public final class UserVectorSplitterMapper extends
Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
@@ -46,23 +47,24 @@ public final class UserVectorSplitterMap
@Override
protected void setup(Context context) {
Configuration jobConf = context.getConfiguration();
- maxPrefsPerUserConsidered = jobConf.getInt(MAX_PREFS_PER_USER_CONSIDERED,
- DEFAULT_MAX_PREFS_PER_USER_CONSIDERED);
- try {
- FileSystem fs = FileSystem.get(jobConf);
- String usersFilePathString = jobConf.get(USERS_FILE);
- if (usersFilePathString == null) {
- usersToRecommendFor = null;
- } else {
+ maxPrefsPerUserConsidered = jobConf.getInt(MAX_PREFS_PER_USER_CONSIDERED, DEFAULT_MAX_PREFS_PER_USER_CONSIDERED);
+ String usersFilePathString = jobConf.get(USERS_FILE);
+ if (usersFilePathString != null) {
+ FSDataInputStream in = null;
+ try {
+ Path unqualifiedUsersFilePath = new Path(usersFilePathString);
+ FileSystem fs = FileSystem.get(unqualifiedUsersFilePath.toUri(), jobConf);
usersToRecommendFor = new FastIDSet();
- Path usersFilePath = new Path(usersFilePathString).makeQualified(fs);
- FSDataInputStream in = fs.open(usersFilePath);
+ Path usersFilePath = unqualifiedUsersFilePath.makeQualified(fs);
+ in = fs.open(usersFilePath);
for (String line : new FileLineIterable(in)) {
usersToRecommendFor.add(Long.parseLong(line));
- }
+ }
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ } finally {
+ IOUtils.closeStream(in);
}
- } catch (IOException ioe) {
- throw new IllegalStateException(ioe);
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java?rev=983982&r1=983981&r2=983982&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/MostSimilarItemPairsMapper.java Tue Aug 10 13:11:06 2010
@@ -17,31 +17,19 @@
package org.apache.mahout.cf.taste.hadoop.similarity.item;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Queue;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
-import org.apache.mahout.math.VarIntWritable;
-import org.apache.mahout.math.VarLongWritable;
-import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.map.OpenIntLongHashMap;
+import java.io.IOException;
+import java.util.*;
+
public class MostSimilarItemPairsMapper
extends Mapper<IntWritable,VectorWritable,EntityEntityWritable,DoubleWritable> {
@@ -56,25 +44,7 @@ public class MostSimilarItemPairsMapper
if (maxSimilarItemsPerItem < 1) {
throw new IllegalStateException("maxSimilarItemsPerItem was not correctly set!");
}
-
- try {
- FileSystem fs = FileSystem.get(conf);
- Path itemIDIndexPath = new Path(itemIDIndexPathStr).makeQualified(fs);
- indexItemIDMap = new OpenIntLongHashMap();
- VarIntWritable index = new VarIntWritable();
- VarLongWritable id = new VarLongWritable();
- for (FileStatus status : fs.listStatus(itemIDIndexPath, TasteHadoopUtils.PARTS_FILTER)) {
- String path = status.getPath().toString();
- SequenceFile.Reader reader =
- new SequenceFile.Reader(fs, new Path(path).makeQualified(fs), conf);
- while (reader.next(index, id)) {
- indexItemIDMap.put(index.get(), id.get());
- }
- reader.close();
- }
- } catch (IOException ioe) {
- throw new IllegalStateException(ioe);
- }
+ indexItemIDMap = TasteHadoopUtils.readItemIDIndexMap(conf.get(ItemSimilarityJob.ITEM_ID_INDEX_PATH_STR), conf);
}
@Override