You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2009/12/07 21:24:11 UTC
svn commit: r888122 - in
/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop:
./ item/ pseudo/
Author: srowen
Date: Mon Dec 7 20:24:07 2009
New Revision: 888122
URL: http://svn.apache.org/viewvc?rev=888122&view=rev
Log:
Simplified input to pseudo-distributed job, fixes, added compression and dropping some cooccurrence data
Added:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java
- copied, changed from r887673, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java
- copied, changed from r887676, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
Removed:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderMapper.java
Modified:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/ReducerMetrics.java
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java?rev=888122&r1=888121&r2=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java Mon Dec 7 20:24:07 2009
@@ -29,6 +29,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
@@ -148,6 +150,7 @@
jobConf.setClass("mapred.reducer.class", reducer, Reducer.class);
jobConf.setClass("mapred.output.key.class", reducerKey, Writable.class);
jobConf.setClass("mapred.output.value.class", reducerValue, Writable.class);
+ jobConf.setBoolean("mapred.output.compress", true);
jobConf.setClass("mapred.output.format.class", outputFormat, OutputFormat.class);
jobConf.set("mapred.output.dir", StringUtils.escapeString(outputPathPath.toString()));
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java?rev=888122&r1=888121&r2=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java Mon Dec 7 20:24:07 2009
@@ -84,7 +84,7 @@
@Override
public String toString() {
- StringBuilder result = new StringBuilder(100);
+ StringBuilder result = new StringBuilder(200);
result.append('[');
boolean first = true;
for (RecommendedItem item : recommended) {
@@ -95,7 +95,12 @@
}
result.append(item.getItemID());
result.append(':');
- result.append(item.getValue());
+ String valueString = String.valueOf(item.getValue());
+ // Is this rounding too crude?
+ if (valueString.length() > 6) {
+ valueString = valueString.substring(0, 6);
+ }
+ result.append(valueString);
}
result.append(']');
return result.toString();
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java?rev=888122&r1=888121&r2=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java Mon Dec 7 20:24:07 2009
@@ -21,6 +21,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
@@ -52,6 +53,13 @@
static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
+ private static final PathFilter IGNORABLE_FILES_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return !path.getName().startsWith("_logs");
+ }
+ };
+
private FileSystem fs;
private Path cooccurrencePath;
private int recommendationsPerUser;
@@ -72,7 +80,7 @@
IntWritable index = new IntWritable();
LongWritable itemID = new LongWritable();
Configuration conf = new Configuration();
- for (FileStatus status : fs.listStatus(itemIDIndexPath)) {
+ for (FileStatus status : fs.listStatus(itemIDIndexPath, IGNORABLE_FILES_FILTER)) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
while (reader.next(index, itemID)) {
indexItemIDMap.put(index.get(), itemID.get());
@@ -94,7 +102,7 @@
Configuration conf = new Configuration();
Queue<RecommendedItem> topItems =
new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1, Collections.reverseOrder());
- for (FileStatus status : fs.listStatus(cooccurrencePath)) {
+ for (FileStatus status : fs.listStatus(cooccurrencePath, IGNORABLE_FILES_FILTER)) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
while (reader.next(indexWritable, cooccurrenceVector)) {
Long itemID = indexItemIDMap.get(indexWritable.get());
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java?rev=888122&r1=888121&r2=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java Mon Dec 7 20:24:07 2009
@@ -28,31 +28,63 @@
import java.io.IOException;
import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Queue;
public final class UserVectorToCooccurrenceMapper
extends MapReduceBase
implements Mapper<LongWritable, SparseVector, IntWritable, IntWritable> {
+ private static final int MAX_PREFS_CONSIDERED = 50;
+
@Override
public void map(LongWritable userID,
SparseVector userVector,
OutputCollector<IntWritable, IntWritable> output,
Reporter reporter) throws IOException {
+ double cutoff = userVector.size() <= MAX_PREFS_CONSIDERED ?
+ Double.NEGATIVE_INFINITY : findTopNPrefsCutoff(MAX_PREFS_CONSIDERED, userVector);
+
Iterator<Vector.Element> it = userVector.iterateNonZero();
+
while (it.hasNext()) {
- int index1 = it.next().index();
- Iterator<Vector.Element> it2 = userVector.iterateNonZero();
- IntWritable itemWritable1 = new IntWritable(index1);
- while (it2.hasNext()) {
- int index2 = it2.next().index();
- if (index1 != index2) {
- output.collect(itemWritable1, new IntWritable(index2));
+ Vector.Element next1 = it.next();
+ if (next1.get() >= cutoff) {
+
+ int index1 = next1.index();
+ Iterator<Vector.Element> it2 = userVector.iterateNonZero();
+ IntWritable itemWritable1 = new IntWritable(index1);
+
+ while (it2.hasNext()) {
+ Vector.Element next2 = it2.next();
+ if (next2.get() >= cutoff) {
+
+ int index2 = next2.index();
+ if (index1 != index2) {
+ output.collect(itemWritable1, new IntWritable(index2));
+ }
+
+ }
}
- }
+ }
}
+ }
+ private static double findTopNPrefsCutoff(int n, Vector userVector) {
+ Queue<Double> topPrefValues = new PriorityQueue<Double>(n + 1);
+ Iterator<Vector.Element> it = userVector.iterateNonZero();
+ while (it.hasNext()) {
+ double prefValue = it.next().get();
+ if (topPrefValues.size() < n) {
+ topPrefValues.add(prefValue);
+ } else if (prefValue > topPrefValues.peek()) {
+ topPrefValues.add(prefValue);
+ topPrefValues.poll();
+ }
+ }
+ return topPrefValues.peek();
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java?rev=888122&r1=888121&r2=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java Mon Dec 7 20:24:07 2009
@@ -19,11 +19,11 @@
import org.apache.commons.cli2.Option;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.cf.taste.hadoop.AbstractJob;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
@@ -37,7 +37,7 @@
* It merely runs many {@link Recommender} instances on Hadoop, where each instance
* is a normal non-distributed implementation.</p>
*
- * <p>This class configures and runs a {@link RecommenderMapper} using Hadoop.</p>
+ * <p>This class configures and runs a {@link RecommenderReducer} using Hadoop.</p>
*
* <p>Command line arguments are:</p>
*
@@ -46,9 +46,7 @@
* recommendations. Note that it must have a constructor which takes a
* {@link org.apache.mahout.cf.taste.model.DataModel} argument.</li>
* <li>numRecommendations: Number of recommendations to compute per user</li>
- * <li>input: Directory containing a text file containing user IDs
- * for which recommendations should be computed, one per line</li>
- * <li>dataModelFile: Location of a data model file containing preference data,
+ * <li>input: Location of a data model file containing preference data,
* suitable for use with {@link org.apache.mahout.cf.taste.impl.model.file.FileDataModel}</li>
* <li>output: output path where recommender output should go</li>
* <li>jarFile: JAR file containing implementation code</li>
@@ -81,9 +79,9 @@
* <p>And launch:</p>
*
* {@code
- * hadoop jar recommender.jar org.apache.mahout.cf.taste.hadoop.RecommenderJob \
+ * hadoop jar recommender.jar org.apache.mahout.cf.taste.hadoop.pseudo.RecommenderJob \
* --recommenderClassName your.project.Recommender \
- * --numRecommendations 10 --input input/users.txt --dataModelFile input/input.csv \
+ * --numRecommendations 10 --input input/users.csv \
* --output output --jarFile recommender.jar
* }
*/
@@ -92,34 +90,34 @@
@Override
public int run(String[] args) throws IOException {
- Option recommendClassOpt = buildOption("recommenderClassName", "r", "Name of recommender class to instantiate", true);
- Option numReccomendationsOpt = buildOption("numRecommendations", "n", "Number of recommendations per user", true);
- Option dataModelFileOpt = buildOption("dataModelFile", "m", "File containing preference data", true);
+ Option recommendClassOpt =
+ buildOption("recommenderClassName", "r", "Name of recommender class to instantiate", true);
+ Option numReccomendationsOpt =
+ buildOption("numRecommendations", "n", "Number of recommendations per user", true);
- Map<String,Object> parsedArgs = parseArguments(args, recommendClassOpt, numReccomendationsOpt, dataModelFileOpt);
- String userIDFile = parsedArgs.get("--input").toString();
+ Map<String,Object> parsedArgs = parseArguments(args, recommendClassOpt, numReccomendationsOpt);
+ String inputFile = parsedArgs.get("--input").toString();
String outputPath = parsedArgs.get("--output").toString();
String jarFile = parsedArgs.get("--jarFile").toString();
String recommendClassName = parsedArgs.get("--recommenderClassName").toString();
int recommendationsPerUser = Integer.parseInt((String) parsedArgs.get("--numRecommendations"));
- String dataModelFile = parsedArgs.get("--dataModelFile").toString();
- JobConf jobConf = prepareJobConf(userIDFile,
+ JobConf jobConf = prepareJobConf(inputFile,
outputPath,
jarFile,
TextInputFormat.class,
- RecommenderMapper.class,
+ UserIDsMapper.class,
LongWritable.class,
- RecommendedItemsWritable.class,
- IdentityReducer.class,
+ NullWritable.class,
+ RecommenderReducer.class,
LongWritable.class,
RecommendedItemsWritable.class,
TextOutputFormat.class);
- jobConf.set(RecommenderMapper.RECOMMENDER_CLASS_NAME, recommendClassName);
- jobConf.setInt(RecommenderMapper.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
- jobConf.set(RecommenderMapper.DATA_MODEL_FILE, dataModelFile);
+ jobConf.set(RecommenderReducer.RECOMMENDER_CLASS_NAME, recommendClassName);
+ jobConf.setInt(RecommenderReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
+ jobConf.set(RecommenderReducer.DATA_MODEL_FILE, inputFile);
JobClient.runJob(jobConf);
return 0;
Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java (from r887673, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderMapper.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderMapper.java&r1=887673&r2=888122&rev=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java Mon Dec 7 20:24:07 2009
@@ -20,11 +20,12 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
@@ -41,19 +42,15 @@
import java.util.List;
/**
- * <p>The {@link Mapper} which takes as input a file of user IDs (treated as Strings, note), and for each unique user
+ * <p>The {@link Reducer} which takes as input the user IDs parsed out by the map phase, and for each unique user
* ID, computes recommendations with the configured {@link Recommender}. The results are output as {@link
* RecommendedItemsWritable}.</p>
*
- * <p>Note that there is no corresponding {@link org.apache.hadoop.mapreduce.Reducer}; this implementation can only
- * partially take advantage of the mapreduce paradigm and only really leverages it for easy parallelization. Therefore,
- * use the {@link org.apache.hadoop.mapred.lib.IdentityReducer} when running this on Hadoop.</p>
- *
* @see RecommenderJob
*/
-public final class RecommenderMapper
+public final class RecommenderReducer
extends MapReduceBase
- implements Mapper<LongWritable, Text, LongWritable, RecommendedItemsWritable> {
+ implements Reducer<LongWritable, NullWritable, LongWritable, RecommendedItemsWritable> {
static final String RECOMMENDER_CLASS_NAME = "recommenderClassName";
static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
@@ -97,13 +94,13 @@
recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
}
-
@Override
- public void map(LongWritable key,
- Text value,
- OutputCollector<LongWritable, RecommendedItemsWritable> output,
- Reporter reporter) throws IOException {
- long userID = Long.parseLong(value.toString());
+ public void reduce(LongWritable key,
+ Iterator<NullWritable> values,
+ OutputCollector<LongWritable, RecommendedItemsWritable> output,
+ Reporter reporter)
+ throws IOException {
+ long userID = key.get();
List<RecommendedItem> recommendedItems;
try {
recommendedItems = recommender.recommend(userID, recommendationsPerUser);
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/ReducerMetrics.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/ReducerMetrics.java?rev=888122&r1=888121&r2=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/ReducerMetrics.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/ReducerMetrics.java Mon Dec 7 20:24:07 2009
@@ -17,7 +17,7 @@
package org.apache.mahout.cf.taste.hadoop.pseudo;
-/** Custom metrics collected by {@link RecommenderMapper}. */
+/** Custom metrics collected by {@link RecommenderReducer}. */
public enum ReducerMetrics {
/** Number of unique users for which recommendations were produced */
Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java (from r887676, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java&r1=887676&r2=888122&rev=888122&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java Mon Dec 7 20:24:07 2009
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.mahout.cf.taste.hadoop.item;
+package org.apache.mahout.cf.taste.hadoop.pseudo;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
@@ -26,27 +26,22 @@
import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
-import java.util.regex.Pattern;
-public final class ItemIDIndexMapper
+/**
+ * Extracts and emits all user IDs from the input file.
+ */
+public final class UserIDsMapper
extends MapReduceBase
- implements Mapper<LongWritable, Text, IntWritable, LongWritable> {
-
- private static final Pattern COMMA = Pattern.compile(",");
+ implements Mapper<LongWritable, Text, LongWritable, NullWritable> {
@Override
public void map(LongWritable key,
Text value,
- OutputCollector<IntWritable, LongWritable> output,
+ OutputCollector<LongWritable, NullWritable> output,
Reporter reporter) throws IOException {
- String[] tokens = COMMA.split(value.toString());
- long itemID = Long.parseLong(tokens[1]);
- int index = itemIDToIndex(itemID);
- output.collect(new IntWritable(index), new LongWritable(itemID));
- }
-
- static int itemIDToIndex(long itemID) {
- return (int) (itemID) ^ (int) (itemID >>> 32);
+ String line = value.toString();
+ long userID = Long.parseLong(line.substring(0, line.indexOf(',')));
+ output.collect(new LongWritable(userID), NullWritable.get());
}
}
\ No newline at end of file