You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/05/25 00:44:53 UTC
svn commit: r947844 [1/2] - in /mahout/trunk: ./
core/src/main/java/org/apache/mahout/cf/taste/hadoop/
core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/
core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/
core/src/main/java/org/apache...
Author: srowen
Date: Mon May 24 22:44:51 2010
New Revision: 947844
URL: http://svn.apache.org/viewvc?rev=947844&view=rev
Log:
Big commit for item-based distributed recommender before it gets bigger. Do away with combiner in co-occurrence phase as it hurt. Use the new varints everywhere. And more.
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
- copied, changed from r945806, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
- copied, changed from r945806, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java
- copied, changed from r945806, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/VarIntWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/VarLongWritable.java
Removed:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/FirstIndexPartitioner.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java
Modified:
mahout/trunk/ (props changed)
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOnePrefsToDiffsReducer.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/Gram.java
mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/GramKey.java
Propchange: mahout/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon May 24 22:44:51 2010
@@ -11,3 +11,5 @@ dist
atlassian-ide-plugin.xml
target
input
+
+.idea
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -21,12 +21,12 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Varint;
/** A {@link Writable} encapsulating an item ID and a count . */
-public final class EntityCountWritable extends VLongWritable implements Cloneable {
+public final class EntityCountWritable extends VarLongWritable implements Cloneable {
private int count;
@@ -59,13 +59,13 @@ public final class EntityCountWritable e
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
- WritableUtils.writeVInt(out, count);
+ Varint.writeUnsignedVarInt(count, out);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
- count = WritableUtils.readVInt(in);
+ count = Varint.readUnsignedVarInt(in);
}
@Override
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -22,8 +22,8 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.Varint;
/** A {@link WritableComparable} encapsulating two items. */
public final class EntityEntityWritable
@@ -56,14 +56,14 @@ public final class EntityEntityWritable
@Override
public void write(DataOutput out) throws IOException {
- WritableUtils.writeVLong(out, aID);
- WritableUtils.writeVLong(out, bID);
+ Varint.writeSignedVarLong(aID, out);
+ Varint.writeSignedVarLong(bID, out);
}
@Override
public void readFields(DataInput in) throws IOException {
- aID = WritableUtils.readVLong(in);
- bID = WritableUtils.readVLong(in);
+ aID = Varint.readSignedVarLong(in);
+ bID = Varint.readSignedVarLong(in);
}
@Override
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -21,12 +21,12 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.VarLongWritable;
/** A {@link Writable} encapsulating an item ID and a preference value. */
-public final class EntityPrefWritable extends VLongWritable implements Cloneable {
+public final class EntityPrefWritable extends VarLongWritable implements Cloneable {
private float prefValue;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -24,9 +24,9 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.math.Varint;
/**
* A {@link Writable} which encapsulates a list of {@link RecommendedItem}s. This is the mapper (and reducer)
@@ -57,7 +57,7 @@ public final class RecommendedItemsWrita
public void write(DataOutput out) throws IOException {
out.writeInt(recommended.size());
for (RecommendedItem item : recommended) {
- WritableUtils.writeVLong(out, item.getItemID());
+ Varint.writeSignedVarLong(item.getItemID(), out);
out.writeFloat(item.getValue());
}
@@ -68,7 +68,7 @@ public final class RecommendedItemsWrita
int size = in.readInt();
recommended = new ArrayList<RecommendedItem>(size);
for (int i = 0; i < size; i++) {
- long itemID = WritableUtils.readVLong(in);
+ long itemID = Varint.readSignedVarLong(in);
float value = in.readFloat();
RecommendedItem recommendedItem = new GenericRecommendedItem(itemID, value);
recommended.add(recommendedItem);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -19,21 +19,21 @@ package org.apache.mahout.cf.taste.hadoo
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
+import org.apache.mahout.math.VarLongWritable;
import java.io.IOException;
import java.util.regex.Pattern;
-abstract class ToEntityPrefsMapper extends MapReduceBase implements
- Mapper<LongWritable,Text,VLongWritable,VLongWritable> {
+public abstract class ToEntityPrefsMapper extends MapReduceBase implements
+ Mapper<LongWritable,Text, VarLongWritable,VarLongWritable> {
- static final String TRANSPOSE_USER_ITEM = "transposeUserItem";
+ public static final String TRANSPOSE_USER_ITEM = "transposeUserItem";
private static final Pattern DELIMITER = Pattern.compile("[\t,]");
@@ -54,7 +54,7 @@ abstract class ToEntityPrefsMapper exten
@Override
public void map(LongWritable key,
Text value,
- OutputCollector<VLongWritable,VLongWritable> output,
+ OutputCollector<VarLongWritable,VarLongWritable> output,
Reporter reporter) throws IOException {
String[] tokens = ToEntityPrefsMapper.DELIMITER.split(value.toString());
long userID = Long.parseLong(tokens[0]);
@@ -68,11 +68,11 @@ abstract class ToEntityPrefsMapper exten
itemID = temp;
}
if (booleanData) {
- output.collect(new VLongWritable(userID), new VLongWritable(itemID));
+ output.collect(new VarLongWritable(userID), new VarLongWritable(itemID));
} else {
float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) : 1.0f;
- output.collect(new VLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
+ output.collect(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
}
}
-}
\ No newline at end of file
+}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -24,7 +24,7 @@ import org.apache.hadoop.io.Text;
*
* <p>
* Intended for use with {@link org.apache.hadoop.mapred.TextInputFormat}; accepts line number / line pairs as
- * {@link org.apache.hadoop.io.VLongWritable}/{@link Text} pairs.
+ * {@link org.apache.mahout.math.VarLongWritable}/{@link Text} pairs.
* </p>
*
* <p>
@@ -35,7 +35,7 @@ import org.apache.hadoop.io.Text;
* <h1>Output</h1>
*
* <p>
- * Outputs the user ID as a {@link org.apache.hadoop.io.VLongWritable} mapped to the item ID and preference as a
+ * Outputs the user ID as a {@link org.apache.mahout.math.VarLongWritable} mapped to the item ID and preference as a
* {@link EntityPrefWritable}.
* </p>
*/
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -29,9 +29,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
@@ -41,12 +39,14 @@ import org.apache.mahout.cf.taste.hadoop
import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.map.OpenIntLongHashMap;
public final class AggregateAndRecommendReducer extends MapReduceBase implements
- Reducer<VLongWritable,VectorWritable,VLongWritable,RecommendedItemsWritable> {
+ Reducer<VarLongWritable,VectorWritable,VarLongWritable,RecommendedItemsWritable> {
static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
@@ -68,8 +68,8 @@ public final class AggregateAndRecommend
FileSystem fs = FileSystem.get(jobConf);
Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
indexItemIDMap = new OpenIntLongHashMap();
- IntWritable index = new IntWritable();
- VLongWritable id = new VLongWritable();
+ VarIntWritable index = new VarIntWritable();
+ VarLongWritable id = new VarLongWritable();
for (FileStatus status : fs.listStatus(itemIDIndexPath, PARTS_FILTER)) {
String path = status.getPath().toString();
SequenceFile.Reader reader =
@@ -85,9 +85,9 @@ public final class AggregateAndRecommend
}
@Override
- public void reduce(VLongWritable key,
+ public void reduce(VarLongWritable key,
Iterator<VectorWritable> values,
- OutputCollector<VLongWritable,RecommendedItemsWritable> output,
+ OutputCollector<VarLongWritable,RecommendedItemsWritable> output,
Reporter reporter) throws IOException {
if (!values.hasNext()) {
return;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -20,21 +20,21 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
import java.util.Iterator;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
public final class AggregateCombiner extends MapReduceBase implements
- Reducer<VLongWritable,VectorWritable,VLongWritable,VectorWritable> {
+ Reducer<VarLongWritable,VectorWritable,VarLongWritable,VectorWritable> {
@Override
- public void reduce(VLongWritable key,
+ public void reduce(VarLongWritable key,
Iterator<VectorWritable> values,
- OutputCollector<VLongWritable,VectorWritable> output,
+ OutputCollector<VarLongWritable,VectorWritable> output,
Reporter reporter) throws IOException {
if (values.hasNext()) {
Vector partial = values.next().get();
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -19,20 +19,20 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VectorWritable;
public final class CooccurrenceColumnWrapperMapper extends MapReduceBase implements
- Mapper<IntWritable, VectorWritable,IntWritable,VectorOrPrefWritable> {
+ Mapper<VarIntWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
@Override
- public void map(IntWritable key,
+ public void map(VarIntWritable key,
VectorWritable value,
- OutputCollector<IntWritable,VectorOrPrefWritable> output,
+ OutputCollector<VarIntWritable,VectorOrPrefWritable> output,
Reporter reporter) throws IOException {
output.collect(key, new VectorOrPrefWritable(value.get()));
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.mahout.math.Varint;
/** A {@link WritableComparable} encapsulating two item indices. */
public final class IndexIndexWritable
@@ -54,14 +55,14 @@ public final class IndexIndexWritable
@Override
public void write(DataOutput out) throws IOException {
- out.writeInt(aID);
- out.writeInt(bID);
+ Varint.writeUnsignedVarInt(aID, out);
+ Varint.writeUnsignedVarInt(bID, out);
}
@Override
public void readFields(DataInput in) throws IOException {
- aID = in.readInt();
- bID = in.readInt();
+ aID = Varint.readUnsignedVarInt(in);
+ bID = Varint.readUnsignedVarInt(in);
}
@Override
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -20,33 +20,42 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
import java.util.regex.Pattern;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
public final class ItemIDIndexMapper extends MapReduceBase implements
- Mapper<LongWritable,Text,IntWritable,VLongWritable> {
+ Mapper<LongWritable,Text, VarIntWritable, VarLongWritable> {
private static final Pattern COMMA = Pattern.compile(",");
+
+ private boolean transpose;
+
+ @Override
+ public void configure(JobConf jobConf) {
+ transpose = jobConf.getBoolean(ToEntityPrefsMapper.TRANSPOSE_USER_ITEM, false);
+ }
@Override
public void map(LongWritable key,
Text value,
- OutputCollector<IntWritable,VLongWritable> output,
+ OutputCollector<VarIntWritable,VarLongWritable> output,
Reporter reporter) throws IOException {
String[] tokens = ItemIDIndexMapper.COMMA.split(value.toString());
- long itemID = Long.parseLong(tokens[1]);
+ long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);
int index = idToIndex(itemID);
- output.collect(new IntWritable(index), new VLongWritable(itemID));
+ output.collect(new VarIntWritable(index), new VarLongWritable(itemID));
}
static int idToIndex(long itemID) {
return 0x7FFFFFFF & ((int) itemID ^ (int) (itemID >>> 32));
}
-}
\ No newline at end of file
+}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -20,20 +20,20 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
import java.util.Iterator;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
public final class ItemIDIndexReducer extends MapReduceBase implements
- Reducer<IntWritable,VLongWritable,IntWritable,VLongWritable> {
+ Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> {
@Override
- public void reduce(IntWritable index,
- Iterator<VLongWritable> possibleItemIDs,
- OutputCollector<IntWritable,VLongWritable> output,
+ public void reduce(VarIntWritable index,
+ Iterator<VarLongWritable> possibleItemIDs,
+ OutputCollector<VarIntWritable,VarLongWritable> output,
Reporter reporter) throws IOException {
if (possibleItemIDs.hasNext()) {
long minimumItemID = Long.MAX_VALUE;
@@ -43,7 +43,7 @@ public final class ItemIDIndexReducer ex
minimumItemID = itemID;
}
}
- output.collect(index, new VLongWritable(minimumItemID));
+ output.collect(index, new VarLongWritable(minimumItemID));
}
}
Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java (from r945806, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java&r1=945806&r2=947844&rev=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -18,132 +18,61 @@
package org.apache.mahout.cf.taste.hadoop.item;
import java.io.IOException;
-import java.util.Iterator;
-import java.util.PriorityQueue;
+import java.util.List;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.function.LongFloatProcedure;
-import org.apache.mahout.math.function.LongProcedure;
-import org.apache.mahout.math.map.OpenLongFloatHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class PartialMultiplyReducer extends MapReduceBase implements
- Reducer<IntWritable,VectorOrPrefWritable,VLongWritable,VectorWritable> {
-
- private static final Logger log = LoggerFactory.getLogger(PartialMultiplyReducer.class);
-
- private static final int MAX_PRODUCTS_PER_ITEM = 100;
-
- private enum Counters {
- PRODUCTS_OUTPUT,
- PRODUCTS_SKIPPED,
- }
+
+public final class PartialMultiplyMapper extends MapReduceBase implements
+ Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,VectorWritable> {
@Override
- public void reduce(IntWritable key,
- Iterator<VectorOrPrefWritable> values,
- final OutputCollector<VLongWritable,VectorWritable> output,
- final Reporter reporter) {
+ public void map(VarIntWritable key,
+ VectorAndPrefsWritable vectorAndPrefsWritable,
+ OutputCollector<VarLongWritable, VectorWritable> output,
+ Reporter reporter) throws IOException {
int itemIndex = key.get();
- OpenLongFloatHashMap savedValues = new OpenLongFloatHashMap();
- Vector cooccurrenceColumn = null;
- while (values.hasNext()) {
- VectorOrPrefWritable value = values.next();
- if (value.getVector() == null) {
- // Then this is a user-pref value
- savedValues.put(value.getUserID(), value.getValue());
- } else {
- // Then this is the column vector
- if (cooccurrenceColumn != null) {
- throw new IllegalStateException("Found two co-occurrence columns for item index " + itemIndex);
- }
- cooccurrenceColumn = value.getVector();
- }
- }
+ Vector cooccurrenceColumn = vectorAndPrefsWritable.getVector();
+ List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
+ List<Float> prefValues = vectorAndPrefsWritable.getValues();
- final VLongWritable userIDWritable = new VLongWritable();
+ VarLongWritable userIDWritable = new VarLongWritable();
// These single-element vectors ensure that each user will not be recommended
// this item
Vector excludeVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
excludeVector.set(itemIndex, Double.NaN);
- final VectorWritable excludeWritable = new VectorWritable(excludeVector);
+ VectorWritable excludeWritable = new VectorWritable(excludeVector);
excludeWritable.setWritesLaxPrecision(true);
- savedValues.forEachKey(new LongProcedure() {
- @Override
- public boolean apply(long userID) {
- userIDWritable.set(userID);
- try {
- output.collect(userIDWritable, excludeWritable);
- } catch (IOException ioe) {
- throw new IllegalStateException(ioe);
- }
- return true;
- }
- });
-
- if (cooccurrenceColumn == null) {
- log.info("Column vector missing for {}; continuing", itemIndex);
- return;
- }
-
- final float smallestLargeValue = findSmallestLargeValue(savedValues);
+ for (long userID : userIDs) {
+ userIDWritable.set(userID);
+ output.collect(userIDWritable, excludeWritable);
+ }
- final VectorWritable vectorWritable = new VectorWritable();
+ VectorWritable vectorWritable = new VectorWritable();
vectorWritable.setWritesLaxPrecision(true);
- final Vector theColumn = cooccurrenceColumn;
- savedValues.forEachPair(new LongFloatProcedure() {
- @Override
- public boolean apply(long userID, float value) {
- if (Math.abs(value) < smallestLargeValue) {
- reporter.incrCounter(Counters.PRODUCTS_SKIPPED, 1L);
- } else {
- try {
- Vector partialProduct = value == 1.0f ? theColumn : theColumn.times(value);
- userIDWritable.set(userID);
- vectorWritable.set(partialProduct);
- output.collect(userIDWritable, vectorWritable);
- } catch (IOException ioe) {
- throw new IllegalStateException(ioe);
- }
- reporter.incrCounter(Counters.PRODUCTS_OUTPUT, 1L);
- }
- return true;
+ for (int i = 0; i < userIDs.size(); i++) {
+ long userID = userIDs.get(i);
+ float prefValue = prefValues.get(i);
+ if (!Float.isNaN(prefValue)) {
+ Vector partialProduct = prefValue == 1.0f ? cooccurrenceColumn :
+ cooccurrenceColumn.times(prefValue);
+ userIDWritable.set(userID);
+ vectorWritable.set(partialProduct);
+ output.collect(userIDWritable, vectorWritable);
}
- });
-
- }
+ }
- private static float findSmallestLargeValue(OpenLongFloatHashMap savedValues) {
- final PriorityQueue<Float> topPrefValues = new PriorityQueue<Float>(MAX_PRODUCTS_PER_ITEM + 1);
- savedValues.forEachPair(new LongFloatProcedure() {
- @Override
- public boolean apply(long userID, float value) {
- if (topPrefValues.size() < MAX_PRODUCTS_PER_ITEM) {
- topPrefValues.add(value);
- } else {
- float absValue = Math.abs(value);
- if (absValue > topPrefValues.peek()) {
- topPrefValues.add(absValue);
- topPrefValues.poll();
- }
- }
- return true;
- }
- });
- return topPrefValues.peek();
}
}
\ No newline at end of file
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -27,23 +27,21 @@ import org.apache.commons.cli2.Option;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
/**
@@ -96,24 +94,24 @@ public final class RecommenderJob extend
String userVectorPath = tempDirPath + "/userVectors";
String itemIDIndexPath = tempDirPath + "/itemIDIndex";
String cooccurrencePath = tempDirPath + "/cooccurrence";
- String parialMultiplyPath = tempDirPath + "/partialMultiply";
+ String partialMultiplyPath = tempDirPath + "/partialMultiply";
AtomicInteger currentPhase = new AtomicInteger();
JobConf itemIDIndexConf = prepareJobConf(
inputPath, itemIDIndexPath, TextInputFormat.class,
- ItemIDIndexMapper.class, IntWritable.class, VLongWritable.class,
- ItemIDIndexReducer.class, IntWritable.class, VLongWritable.class,
+ ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,
+ ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,
SequenceFileOutputFormat.class);
- itemIDIndexConf.setClass("mapred.combiner.class", ItemIDIndexReducer.class, Reducer.class);
+ itemIDIndexConf.setClass("mapred.combiner.class", ItemIDIndexReducer.class, Reducer.class);
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
JobClient.runJob(itemIDIndexConf);
}
JobConf toUserVectorConf = prepareJobConf(
inputPath, userVectorPath, TextInputFormat.class,
- ToItemPrefsMapper.class, VLongWritable.class, booleanData ? VLongWritable.class : EntityPrefWritable.class,
- ToUserVectorReducer.class, VLongWritable.class, VectorWritable.class,
+ ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
+ ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class,
SequenceFileOutputFormat.class);
toUserVectorConf.setBoolean(BOOLEAN_DATA, booleanData);
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
@@ -122,20 +120,18 @@ public final class RecommenderJob extend
JobConf toCooccurrenceConf = prepareJobConf(
userVectorPath, cooccurrencePath, SequenceFileInputFormat.class,
- UserVectorToCooccurrenceMapper.class, IndexIndexWritable.class, IntWritable.class,
- UserVectorToCooccurrenceReducer.class, IntWritable.class, VectorWritable.class,
+ UserVectorToCooccurrenceMapper.class, VarIntWritable.class, VarIntWritable.class,
+ UserVectorToCooccurrenceReducer.class, VarIntWritable.class, VectorWritable.class,
SequenceFileOutputFormat.class);
setIOSort(toCooccurrenceConf);
- toCooccurrenceConf.setClass("mapred.combiner.class", CooccurrenceCombiner.class, Reducer.class);
- toCooccurrenceConf.setClass("mapred.partitioner.class", FirstIndexPartitioner.class, Partitioner.class);
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
JobClient.runJob(toCooccurrenceConf);
}
JobConf partialMultiplyConf = prepareJobConf(
- cooccurrencePath, parialMultiplyPath, SequenceFileInputFormat.class,
- CooccurrenceColumnWrapperMapper.class, IntWritable.class, VectorOrPrefWritable.class,
- PartialMultiplyReducer.class, VLongWritable.class, VectorWritable.class,
+ cooccurrencePath, partialMultiplyPath, SequenceFileInputFormat.class,
+ CooccurrenceColumnWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
+ ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
SequenceFileOutputFormat.class);
MultipleInputs.addInputPath(
partialMultiplyConf,
@@ -153,9 +149,9 @@ public final class RecommenderJob extend
}
JobConf aggregateAndRecommendConf = prepareJobConf(
- parialMultiplyPath, outputPath, SequenceFileInputFormat.class,
- IdentityMapper.class, VLongWritable.class, VectorWritable.class,
- AggregateAndRecommendReducer.class, VLongWritable.class, RecommendedItemsWritable.class,
+ partialMultiplyPath, outputPath, SequenceFileInputFormat.class,
+ PartialMultiplyMapper.class, VarLongWritable.class, VectorWritable.class,
+ AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
TextOutputFormat.class);
setIOSort(aggregateAndRecommendConf);
aggregateAndRecommendConf.setClass("mapred.combiner.class", AggregateCombiner.class, Reducer.class);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -20,13 +20,13 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
import java.util.Iterator;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
@@ -34,7 +34,7 @@ import org.apache.mahout.math.VectorWrit
* <h1>Input</h1>
*
* <p>
- * Takes user IDs as {@link VLongWritable} mapped to all associated item IDs and preference values, as
+ * Takes user IDs as {@link VarLongWritable} mapped to all associated item IDs and preference values, as
* {@link EntityPrefWritable}s.
* </p>
*
@@ -48,19 +48,19 @@ import org.apache.mahout.math.VectorWrit
* </p>
*/
public final class ToUserVectorReducer extends MapReduceBase implements
- Reducer<VLongWritable,VLongWritable,VLongWritable,VectorWritable> {
+ Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
@Override
- public void reduce(VLongWritable userID,
- Iterator<VLongWritable> itemPrefs,
- OutputCollector<VLongWritable,VectorWritable> output,
+ public void reduce(VarLongWritable userID,
+ Iterator<VarLongWritable> itemPrefs,
+ OutputCollector<VarLongWritable,VectorWritable> output,
Reporter reporter) throws IOException {
if (!itemPrefs.hasNext()) {
return;
}
Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
while (itemPrefs.hasNext()) {
- VLongWritable itemPref = itemPrefs.next();
+ VarLongWritable itemPref = itemPrefs.next();
int index = ItemIDIndexMapper.idToIndex(itemPref.get());
float value;
if (itemPref instanceof EntityPrefWritable) {
Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java (from r945806, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java&r1=945806&r2=947844&rev=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -18,62 +18,50 @@
package org.apache.mahout.cf.taste.hadoop.item;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
-import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
-/**
- * <h1>Input</h1>
- *
- * <p>
- * Takes user IDs as {@link VLongWritable} mapped to all associated item IDs and preference values, as
- * {@link EntityPrefWritable}s.
- * </p>
- *
- * <h1>Output</h1>
- *
- * <p>
- * The same user ID mapped to a {@link RandomAccessSparseVector} representation of the same item IDs and
- * preference values. Item IDs are used as vector indexes; they are hashed into ints to work as indexes with
- * {@link ItemIDIndexMapper#idToIndex(long)}. The mapping is remembered for later with a combination of
- * {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.
- * </p>
- */
-public final class ToUserVectorReducer extends MapReduceBase implements
- Reducer<VLongWritable,VLongWritable,VLongWritable,VectorWritable> {
-
+public final class ToVectorAndPrefReducer extends MapReduceBase implements
+ Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> {
+
@Override
- public void reduce(VLongWritable userID,
- Iterator<VLongWritable> itemPrefs,
- OutputCollector<VLongWritable,VectorWritable> output,
+ public void reduce(VarIntWritable key,
+ Iterator<VectorOrPrefWritable> values,
+ OutputCollector<VarIntWritable,VectorAndPrefsWritable> output,
Reporter reporter) throws IOException {
- if (!itemPrefs.hasNext()) {
- return;
- }
- Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
- while (itemPrefs.hasNext()) {
- VLongWritable itemPref = itemPrefs.next();
- int index = ItemIDIndexMapper.idToIndex(itemPref.get());
- float value;
- if (itemPref instanceof EntityPrefWritable) {
- value = ((EntityPrefWritable) itemPref).getPrefValue();
+
+ List<Long> userIDs = new ArrayList<Long>();
+ List<Float> prefValues = new ArrayList<Float>();
+ Vector cooccurrenceColumn = null;
+ while (values.hasNext()) {
+ VectorOrPrefWritable value = values.next();
+ if (value.getVector() == null) {
+ // Then this is a user-pref value
+ userIDs.add(value.getUserID());
+ prefValues.add(value.getValue());
} else {
- value = 1.0f;
+ // Then this is the column vector
+ if (cooccurrenceColumn != null) {
+ throw new IllegalStateException("Found two co-occurrence columns for item index " + key.get());
+ }
+ cooccurrenceColumn = value.getVector();
}
- userVector.set(index, value);
}
- VectorWritable vw = new VectorWritable(userVector);
- vw.setWritesLaxPrecision(true);
- output.collect(userID, vw);
+ if (cooccurrenceColumn == null) {
+ return;
+ }
+
+ VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(cooccurrenceColumn, userIDs, prefValues);
+ output.collect(key, vectorAndPrefs);
}
-
-}
+
+}
\ No newline at end of file
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -21,15 +21,15 @@ import org.apache.hadoop.io.ArrayWritabl
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.GenericWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VIntWritable;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
public final class TupleWritable extends ArrayWritable {
public static class Field extends GenericWritable {
- private static final Class<?>[] CLASSES = {VIntWritable.class, VLongWritable.class, DoubleWritable.class,
+ private static final Class<?>[] CLASSES = {VarIntWritable.class, VarLongWritable.class, DoubleWritable.class,
Text.class};
@Override
@@ -98,8 +98,8 @@ public final class TupleWritable extends
public int getInt(int idx) {
Field field = get(idx);
Class<? extends Writable> wrappedClass = field.get().getClass();
- if (wrappedClass.equals(VIntWritable.class)) {
- return ((VIntWritable) field.get()).get();
+ if (wrappedClass.equals(VarIntWritable.class)) {
+ return ((VarIntWritable) field.get()).get();
} else {
throw new IllegalArgumentException("Not an integer: " + wrappedClass);
}
@@ -108,8 +108,8 @@ public final class TupleWritable extends
public long getLong(int idx) {
Field field = get(idx);
Class<? extends Writable> wrappedClass = field.get().getClass();
- if (wrappedClass.equals(VLongWritable.class)) {
- return ((VLongWritable) field.get()).get();
+ if (wrappedClass.equals(VarLongWritable.class)) {
+ return ((VarLongWritable) field.get()).get();
} else {
throw new IllegalArgumentException("Not a long: " + wrappedClass);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -19,12 +19,11 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
import java.util.Iterator;
+import java.util.PriorityQueue;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
@@ -32,12 +31,15 @@ import org.apache.hadoop.mapred.OutputCo
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.cf.taste.impl.common.FastIDSet;
import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
public final class UserVectorSplitterMapper extends MapReduceBase implements
- Mapper<VLongWritable,VectorWritable,IntWritable,VectorOrPrefWritable> {
+ Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
+ private static final int MAX_PREFS_CONSIDERED = 10;
static final String USERS_FILE = "usersFile";
private FastIDSet usersToRecommendFor;
@@ -63,26 +65,63 @@ public final class UserVectorSplitterMap
}
@Override
- public void map(VLongWritable key,
+ public void map(VarLongWritable key,
VectorWritable value,
- OutputCollector<IntWritable,VectorOrPrefWritable> output,
+ OutputCollector<VarIntWritable,VectorOrPrefWritable> output,
Reporter reporter) throws IOException {
long userID = key.get();
if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
return;
}
- Vector userVector = value.get();
+ Vector userVector = maybePruneUserVector(value.get());
Iterator<Vector.Element> it = userVector.iterateNonZero();
- IntWritable itemIndexWritable = new IntWritable();
+ VarIntWritable itemIndexWritable = new VarIntWritable();
VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
while (it.hasNext()) {
Vector.Element e = it.next();
- int itemIndex = e.index();
- double preferenceValue = e.get();
- itemIndexWritable.set(itemIndex);
- vectorOrPref.set(userID, (float) preferenceValue);
+ itemIndexWritable.set(e.index());
+ vectorOrPref.set(userID, (float) e.get());
output.collect(itemIndexWritable, vectorOrPref);
}
}
+ private static Vector maybePruneUserVector(Vector userVector) {
+ if (userVector.getNumNondefaultElements() <= MAX_PREFS_CONSIDERED) {
+ return userVector;
+ }
+
+ float smallestLargeValue = findSmallestLargeValue(userVector);
+
+ // "Blank out" small-sized prefs to reduce the amount of partial products
+ // generated later. They're not zeroed, but NaN-ed, so they come through
+ // and can be used to exclude these items from prefs.
+ Iterator<Vector.Element> it = userVector.iterateNonZero();
+ while (it.hasNext()) {
+ Vector.Element e = it.next();
+ float absValue = Math.abs((float) e.get());
+ if (absValue < smallestLargeValue) {
+ e.set(Float.NaN);
+ }
+ }
+
+ return userVector;
+ }
+
+ private static float findSmallestLargeValue(Vector userVector) {
+ PriorityQueue<Float> topPrefValues = new PriorityQueue<Float>(MAX_PREFS_CONSIDERED + 1);
+ Iterator<Vector.Element> it = userVector.iterateNonZero();
+ while (it.hasNext()) {
+ float absValue = Math.abs((float) it.next().get());
+ if (topPrefValues.size() < MAX_PREFS_CONSIDERED) {
+ topPrefValues.add(absValue);
+ } else {
+ if (absValue > topPrefValues.peek()) {
+ topPrefValues.add(absValue);
+ topPrefValues.poll();
+ }
+ }
+ }
+ return topPrefValues.peek();
+ }
+
}
\ No newline at end of file
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -22,49 +22,56 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.PriorityQueue;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.map.OpenIntIntHashMap;
public final class UserVectorToCooccurrenceMapper extends MapReduceBase implements
- Mapper<VLongWritable,VectorWritable,IndexIndexWritable,IntWritable> {
+ Mapper<VarLongWritable,VectorWritable,VarIntWritable,VarIntWritable> {
private static final int MAX_PREFS_CONSIDERED = 100;
- private boolean outputGuardValue = true;
+ private enum Counters {
+ USER_PREFS_SKIPPED,
+ }
+
private final OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
@Override
- public void map(VLongWritable userID,
+ public void map(VarLongWritable userID,
VectorWritable userVectorWritable,
- OutputCollector<IndexIndexWritable,IntWritable> output,
+ OutputCollector<VarIntWritable,VarIntWritable> output,
Reporter reporter) throws IOException {
+
Vector userVector = userVectorWritable.get();
countSeen(userVector);
- userVector = maybePruneUserVector(userVector);
+
+ int originalSize = userVector.getNumNondefaultElements();
+ userVector = maybePruneUserVector(userVector);
+ int newSize = userVector.getNumNondefaultElements();
+ if (newSize < originalSize) {
+ reporter.incrCounter(Counters.USER_PREFS_SKIPPED, originalSize - newSize);
+ }
+
Iterator<Vector.Element> it = userVector.iterateNonZero();
- IndexIndexWritable entityEntity = new IndexIndexWritable();
- IntWritable one = new IntWritable(1);
+ VarIntWritable itemIndex1 = new VarIntWritable();
+ VarIntWritable itemIndex2 = new VarIntWritable();
while (it.hasNext()) {
int index1 = it.next().index();
+ itemIndex1.set(index1);
Iterator<Vector.Element> it2 = userVector.iterateNonZero();
while (it2.hasNext()) {
int index2 = it2.next().index();
- entityEntity.set(index1, index2);
- output.collect(entityEntity, one);
+ itemIndex2.set(index2);
+ output.collect(itemIndex1, itemIndex2);
}
}
- // Guard value, output once, sorts after everything; will be dropped by combiner
- if (outputGuardValue) {
- output.collect(new IndexIndexWritable(Integer.MAX_VALUE, Integer.MAX_VALUE), one);
- outputGuardValue = false;
- }
}
private Vector maybePruneUserVector(Vector userVector) {
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -20,64 +20,31 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
import java.util.Iterator;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
public final class UserVectorToCooccurrenceReducer extends MapReduceBase implements
- Reducer<IndexIndexWritable,IntWritable,IntWritable,VectorWritable> {
-
- private int lastItem1ID = Integer.MIN_VALUE;
- private int lastItem2ID = Integer.MIN_VALUE;
- private Vector cooccurrenceRow = null;
- private int count = 0;
+ Reducer<VarIntWritable,VarIntWritable,VarIntWritable,VectorWritable> {
@Override
- public void reduce(IndexIndexWritable entityEntity,
- Iterator<IntWritable> counts,
- OutputCollector<IntWritable,VectorWritable> output,
+ public void reduce(VarIntWritable itemIndex1,
+ Iterator<VarIntWritable> itemIndex2s,
+ OutputCollector<VarIntWritable,VectorWritable> output,
Reporter reporter) throws IOException {
-
- int item1ID = entityEntity.getAID();
- int item2ID = entityEntity.getBID();
- int sum = CooccurrenceCombiner.sum(counts);
-
- if (item1ID < lastItem1ID) {
- throw new IllegalStateException();
- }
- if (item1ID == lastItem1ID) {
- if (item2ID < lastItem2ID) {
- throw new IllegalStateException();
- }
- if (item2ID == lastItem2ID) {
- count += sum;
- } else {
- if (cooccurrenceRow == null) {
- cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
- }
- cooccurrenceRow.set(lastItem2ID, count);
- lastItem2ID = item2ID;
- count = sum;
- }
- } else {
- if (cooccurrenceRow != null) {
- if (count > 0) {
- cooccurrenceRow.set(lastItem2ID, count);
- }
- VectorWritable vw = new VectorWritable(cooccurrenceRow);
- vw.setWritesLaxPrecision(true);
- output.collect(new IntWritable(lastItem1ID), vw);
- }
- lastItem1ID = item1ID;
- lastItem2ID = item2ID;
- cooccurrenceRow = null;
- count = sum;
+ Vector cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ while (itemIndex2s.hasNext()) {
+ int itemIndex2 = itemIndex2s.next().get();
+ cooccurrenceRow.set(itemIndex2, cooccurrenceRow.get(itemIndex2) + 1.0);
}
+ VectorWritable vw = new VectorWritable(cooccurrenceRow);
+ vw.setWritesLaxPrecision(true);
+ output.collect(itemIndex1, vw);
}
}
\ No newline at end of file
Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java (from r945806, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java&r1=945806&r2=947844&rev=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java Mon May 24 22:44:51 2010
@@ -1,11 +1,10 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -21,80 +20,65 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.mahout.math.Varint;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
-public final class VectorOrPrefWritable implements Writable {
+public final class VectorAndPrefsWritable implements Writable {
private Vector vector;
- private long userID;
- private float value;
+ private List<Long> userIDs;
+ private List<Float> values;
- public VectorOrPrefWritable() {
+ public VectorAndPrefsWritable() {
}
- public VectorOrPrefWritable(Vector vector) {
+ public VectorAndPrefsWritable(Vector vector, List<Long> userIDs, List<Float> values) {
this.vector = vector;
- }
-
- public VectorOrPrefWritable(long userID, float value) {
- this.userID = userID;
- this.value = value;
+ this.userIDs = userIDs;
+ this.values = values;
}
public Vector getVector() {
return vector;
}
- public long getUserID() {
- return userID;
- }
-
- public float getValue() {
- return value;
- }
-
- public void set(Vector vector) {
- this.vector = vector;
- this.userID = Long.MIN_VALUE;
- this.value = Float.NaN;
+ public List<Long> getUserIDs() {
+ return userIDs;
}
- public void set(long userID, float value) {
- this.vector = null;
- this.userID = userID;
- this.value = value;
+ public List<Float> getValues() {
+ return values;
}
@Override
public void write(DataOutput out) throws IOException {
- if (vector == null) {
- out.writeBoolean(false);
- WritableUtils.writeVLong(out, userID);
- out.writeFloat(value);
- } else {
- out.writeBoolean(true);
- VectorWritable vw = new VectorWritable(vector);
- vw.setWritesLaxPrecision(true);
- vw.write(out);
+ VectorWritable vw = new VectorWritable(vector);
+ vw.setWritesLaxPrecision(true);
+ vw.write(out);
+ Varint.writeUnsignedVarInt(userIDs.size(), out);
+ for (int i = 0; i < userIDs.size(); i++) {
+ Varint.writeSignedVarLong(userIDs.get(i), out);
+ out.writeFloat(values.get(i));
}
}
@Override
public void readFields(DataInput in) throws IOException {
- boolean hasVector = in.readBoolean();
- if (hasVector) {
- VectorWritable writable = new VectorWritable();
- writable.readFields(in);
- set(writable.get());
- } else {
- long theUserID = WritableUtils.readVLong(in);
- float theValue = in.readFloat();
- set(theUserID, theValue);
+ VectorWritable writable = new VectorWritable();
+ writable.readFields(in);
+ vector = writable.get();
+ int size = Varint.readUnsignedVarInt(in);
+ userIDs = new ArrayList<Long>(size);
+ values = new ArrayList<Float>(size);
+ for (int i = 0; i < size; i++) {
+ userIDs.add(Varint.readSignedVarLong(in));
+ values.add(in.readFloat());
}
}
-}
+}
\ No newline at end of file
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java Mon May 24 22:44:51 2010
@@ -23,7 +23,7 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.mahout.math.Varint;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
@@ -73,7 +73,7 @@ public final class VectorOrPrefWritable
public void write(DataOutput out) throws IOException {
if (vector == null) {
out.writeBoolean(false);
- WritableUtils.writeVLong(out, userID);
+ Varint.writeSignedVarLong(userID, out);
out.writeFloat(value);
} else {
out.writeBoolean(true);
@@ -91,7 +91,7 @@ public final class VectorOrPrefWritable
writable.readFields(in);
set(writable.get());
} else {
- long theUserID = WritableUtils.readVLong(in);
+ long theUserID = Varint.readSignedVarLong(in);
float theValue = in.readFloat();
set(theUserID, theValue);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -23,7 +23,6 @@ import java.util.Map;
import org.apache.commons.cli2.Option;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.JobClient;
@@ -34,6 +33,7 @@ import org.apache.hadoop.util.ToolRunner
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.recommender.Recommender;
+import org.apache.mahout.math.VarLongWritable;
/**
* <p>
@@ -130,8 +130,8 @@ public final class RecommenderJob extend
int recommendationsPerUser = Integer.parseInt(parsedArgs.get("--numRecommendations"));
JobConf jobConf = prepareJobConf(usersFile, outputPath, TextInputFormat.class,
- UserIDsMapper.class, VLongWritable.class, NullWritable.class, RecommenderReducer.class,
- VLongWritable.class, RecommendedItemsWritable.class, TextOutputFormat.class);
+ UserIDsMapper.class, VarLongWritable.class, NullWritable.class, RecommenderReducer.class,
+ VarLongWritable.class, RecommendedItemsWritable.class, TextOutputFormat.class);
jobConf.set(RecommenderReducer.RECOMMENDER_CLASS_NAME, recommendClassName);
jobConf.setInt(RecommenderReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -27,7 +27,6 @@ import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
@@ -39,6 +38,7 @@ import org.apache.mahout.cf.taste.impl.m
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.cf.taste.recommender.Recommender;
+import org.apache.mahout.math.VarLongWritable;
/**
* <p>
@@ -50,7 +50,7 @@ import org.apache.mahout.cf.taste.recomm
* @see RecommenderJob
*/
public final class RecommenderReducer extends MapReduceBase implements
- Reducer<VLongWritable,NullWritable,VLongWritable,RecommendedItemsWritable> {
+ Reducer<VarLongWritable,NullWritable,VarLongWritable,RecommendedItemsWritable> {
static final String RECOMMENDER_CLASS_NAME = "recommenderClassName";
static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
@@ -94,9 +94,9 @@ public final class RecommenderReducer ex
}
@Override
- public void reduce(VLongWritable key,
+ public void reduce(VarLongWritable key,
Iterator<NullWritable> values,
- OutputCollector<VLongWritable,RecommendedItemsWritable> output,
+ OutputCollector<VarLongWritable,RecommendedItemsWritable> output,
Reporter reporter) throws IOException {
long userID = key.get();
List<RecommendedItem> recommendedItems;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -22,27 +22,27 @@ import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.VarLongWritable;
/**
* Extracts and emits all user IDs from the users file, or input file.
*/
public final class UserIDsMapper extends MapReduceBase implements
- Mapper<LongWritable,Text,VLongWritable,NullWritable> {
+ Mapper<LongWritable,Text, VarLongWritable,NullWritable> {
@Override
public void map(LongWritable key,
Text value,
- OutputCollector<VLongWritable,NullWritable> output,
+ OutputCollector<VarLongWritable,NullWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
int comma = line.indexOf(',');
long userID = comma >= 0 ? Long.parseLong(line.substring(0, comma)) : Long.parseLong(line);
- output.collect(new VLongWritable(userID), NullWritable.get());
+ output.collect(new VarLongWritable(userID), NullWritable.get());
}
}
\ No newline at end of file
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -19,22 +19,22 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.cf.taste.hadoop.similarity.CoRating;
+import org.apache.mahout.math.VarLongWritable;
/**
* map out each pair of items that appears in the same user-vector together with the multiplied vector lengths
* of the associated item vectors
*/
public final class CopreferredItemsMapper extends MapReduceBase
- implements Mapper<VLongWritable,ItemPrefWithItemVectorWeightArrayWritable,ItemPairWritable,CoRating> {
+ implements Mapper<VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable,ItemPairWritable,CoRating> {
@Override
- public void map(VLongWritable user,
+ public void map(VarLongWritable user,
ItemPrefWithItemVectorWeightArrayWritable itemPrefsArray,
OutputCollector<ItemPairWritable, CoRating> output,
Reporter reporter)
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -22,13 +22,13 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Varint;
/**
* a writable key that is used by {@link CountUsersMapper} and {@link CountUsersReducer} to
@@ -52,12 +52,12 @@ public class CountUsersKeyWritable imple
@Override
public void readFields(DataInput in) throws IOException {
- userID = WritableUtils.readVLong(in);
+ userID = Varint.readSignedVarLong(in);
}
@Override
public void write(DataOutput out) throws IOException {
- WritableUtils.writeVLong(out, userID);
+ Varint.writeSignedVarLong(userID, out);
}
@Override
@@ -81,10 +81,10 @@ public class CountUsersKeyWritable imple
/**
* all userIDs go to the same partition
*/
- public static class CountUsersPartitioner implements Partitioner<CountUsersKeyWritable,VLongWritable> {
+ public static class CountUsersPartitioner implements Partitioner<CountUsersKeyWritable,VarLongWritable> {
@Override
- public int getPartition(CountUsersKeyWritable key, VLongWritable value, int numPartitions) {
+ public int getPartition(CountUsersKeyWritable key, VarLongWritable value, int numPartitions) {
return 0;
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -22,29 +22,29 @@ import java.util.regex.Pattern;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.VarLongWritable;
/**
* maps out the userIDs in a way that we can use a secondary sort on them
*/
public class CountUsersMapper extends MapReduceBase
- implements Mapper<LongWritable,Text,CountUsersKeyWritable,VLongWritable> {
+ implements Mapper<LongWritable,Text,CountUsersKeyWritable, VarLongWritable> {
private static final Pattern DELIMITER = Pattern.compile("[\t,]");
@Override
public void map(LongWritable arg0, Text value,
- OutputCollector<CountUsersKeyWritable,VLongWritable> out, Reporter reporter)
+ OutputCollector<CountUsersKeyWritable,VarLongWritable> out, Reporter reporter)
throws IOException {
String[] tokens = DELIMITER.split(value.toString());
long userID = Long.parseLong(tokens[0]);
- out.collect(new CountUsersKeyWritable(userID), new VLongWritable(userID));
+ out.collect(new CountUsersKeyWritable(userID), new VarLongWritable(userID));
}
}