You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/07/08 21:19:54 UTC
svn commit: r961888 - in /mahout/trunk/core/src:
main/java/org/apache/mahout/cf/taste/hadoop/
main/java/org/apache/mahout/cf/taste/hadoop/item/
main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/
main/java/org/apache/mahout/math/hadoop/similar...
Author: srowen
Date: Thu Jul 8 19:19:53 2010
New Revision: 961888
URL: http://svn.apache.org/viewvc?rev=961888&view=rev
Log:
MAHOUT-420
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/MaybePruneRowsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedCooccurrenceVectorSimilarity.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java
Removed:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TasteHadoopUtils.java Thu Jul 8 19:19:53 2010
@@ -17,10 +17,17 @@
package org.apache.mahout.cf.taste.hadoop;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
import java.util.regex.Pattern;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.IOUtils;
/**
* some helper methods for the hadoop-related stuff in org.apache.mahout.cf.taste
@@ -60,4 +67,26 @@ public final class TasteHadoopUtils {
public static int idToIndex(long id) {
return 0x7FFFFFFF & ((int) id ^ (int) (id >>> 32));
}
+
+ /**
+ * reads a text-based outputfile that only contains an int
+ *
+ * @param conf
+ * @param outputDir
+ * @return
+ * @throws IOException
+ */
+ public static int readIntFromFile(Configuration conf, Path outputDir) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ Path outputFile = fs.listStatus(outputDir, TasteHadoopUtils.PARTS_FILTER)[0].getPath();
+ InputStream in = null;
+ try {
+ in = fs.open(outputFile);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ IOUtils.copyBytes(in, out, conf);
+ return Integer.parseInt(new String(out.toByteArray(), Charset.forName("UTF-8")).trim());
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ }
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Thu Jul 8 19:19:53 2010
@@ -30,52 +30,60 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
import org.apache.mahout.cf.taste.impl.common.FastIDSet;
import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
-import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.function.UnaryFunction;
import org.apache.mahout.math.map.OpenIntLongHashMap;
+/**
+ * <p>computes prediction values for each user</p>
+ *
+ * <pre>
+ * u = a user
+ * i = an item not yet rated by u
+ * N = all items similar to i (where similarity is usually computed by pairwisely comparing the item-vectors
+ * of the user-item matrix)
+ *
+ * Prediction(u,i) = sum(all n from N: similarity(i,n) * rating(u,n)) / sum(all n from N: abs(similarity(i,n)))
+ * </pre>
+ */
public final class AggregateAndRecommendReducer extends
- Reducer<VarLongWritable,VectorWritable,VarLongWritable,RecommendedItemsWritable> {
+ Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable> {
static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
static final String NUM_RECOMMENDATIONS = "numRecommendations";
static final int DEFAULT_NUM_RECOMMENDATIONS = 10;
- static final String ITEMS_FILE="itemsFile";
-
- private FastIDSet itemsToRecommendFor;
-
- private static final PathFilter PARTS_FILTER = new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return path.getName().startsWith("part-");
- }
- };
+ static final String ITEMS_FILE = "itemsFile";
+ private boolean booleanData;
private int recommendationsPerUser;
+ private FastIDSet itemsToRecommendFor;
private OpenIntLongHashMap indexItemIDMap;
@Override
protected void setup(Context context) {
Configuration jobConf = context.getConfiguration();
recommendationsPerUser = jobConf.getInt(NUM_RECOMMENDATIONS, DEFAULT_NUM_RECOMMENDATIONS);
+ booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
try {
FileSystem fs = FileSystem.get(jobConf);
Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
indexItemIDMap = new OpenIntLongHashMap();
VarIntWritable index = new VarIntWritable();
VarLongWritable id = new VarLongWritable();
- for (FileStatus status : fs.listStatus(itemIDIndexPath, PARTS_FILTER)) {
+ for (FileStatus status : fs.listStatus(itemIDIndexPath, TasteHadoopUtils.PARTS_FILTER)) {
String path = status.getPath().toString();
SequenceFile.Reader reader =
new SequenceFile.Reader(fs, new Path(path).makeQualified(fs), jobConf);
@@ -87,15 +95,15 @@ public final class AggregateAndRecommend
} catch (IOException ioe) {
throw new IllegalStateException(ioe);
}
-
+
try {
FileSystem fs = FileSystem.get(jobConf);
- String usersFilePathString = jobConf.get(ITEMS_FILE);
- if (usersFilePathString == null) {
+ String itemFilePathString = jobConf.get(ITEMS_FILE);
+ if (itemFilePathString == null) {
itemsToRecommendFor = null;
} else {
itemsToRecommendFor = new FastIDSet();
- Path usersFilePath = new Path(usersFilePathString).makeQualified(fs);
+ Path usersFilePath = new Path(itemFilePathString).makeQualified(fs);
FSDataInputStream in = fs.open(usersFilePath);
for (String line : new FileLineIterable(in)) {
itemsToRecommendFor.add(Long.parseLong(line));
@@ -106,38 +114,113 @@ public final class AggregateAndRecommend
}
}
+ private static final UnaryFunction ABSOLUTE_VALUES = new UnaryFunction() {
+ @Override
+ public double apply(double value) {
+ return value < 0 ? value * -1 : value;
+ }
+ };
+
@Override
- protected void reduce(VarLongWritable key,
- Iterable<VectorWritable> values,
+ protected void reduce(VarLongWritable userID,
+ Iterable<PrefAndSimilarityColumnWritable> values,
Context context) throws IOException, InterruptedException {
+ if (booleanData) {
+ reduceBooleanData(userID, values, context);
+ } else {
+ reduceNonBooleanData(userID, values, context);
+ }
+ }
- Vector recommendationVector = null;
- for (VectorWritable vectorWritable : values) {
- recommendationVector = recommendationVector == null
- ? vectorWritable.get()
- : recommendationVector.plus(vectorWritable.get());
+ private void reduceBooleanData(VarLongWritable userID,
+ Iterable<PrefAndSimilarityColumnWritable> values,
+ Context context) throws IOException, InterruptedException {
+
+ /* having boolean data, each estimated preference can only be 1,
+ * so the computation is much simpler */
+ Vector predictionVector = null;
+ for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
+ predictionVector = predictionVector == null
+ ? prefAndSimilarityColumn.getSimilarityColumn()
+ : predictionVector.plus(prefAndSimilarityColumn.getSimilarityColumn());
+ }
+
+ Iterator<Element> predictions = predictionVector.iterateNonZero();
+ List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>();
+ while (predictions.hasNext() && recommendations.size() < recommendationsPerUser) {
+ int itemIDIndex = predictions.next().index();
+ long itemID = indexItemIDMap.get(itemIDIndex);
+ if (itemsToRecommendFor == null || itemsToRecommendFor.contains(itemID)) {
+ recommendations.add(new GenericRecommendedItem(itemID, 1f));
+ }
}
- if (recommendationVector == null) {
+
+ if (!recommendations.isEmpty()) {
+ context.write(userID, new RecommendedItemsWritable(recommendations));
+ }
+ }
+
+ private void reduceNonBooleanData(VarLongWritable userID,
+ Iterable<PrefAndSimilarityColumnWritable> values,
+ Context context) throws IOException, InterruptedException {
+ /* each entry here is the sum in the numerator of the prediction formula */
+ Vector numerators = null;
+ /* each entry here is the sum in the denominator of the prediction formula */
+ Vector denominators = null;
+ /* each entry here is the number of similar items used in the prediction formula */
+ Vector numberOfSimilarItemsUsed = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+
+ for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
+ Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn();
+ float prefValue = prefAndSimilarityColumn.getPrefValue();
+ /* count the number of items used for each prediction */
+ Iterator<Element> usedItemsIterator = simColumn.iterateNonZero();
+ while (usedItemsIterator.hasNext()) {
+ int itemIDIndex = usedItemsIterator.next().index();
+ numberOfSimilarItemsUsed.setQuick(itemIDIndex, numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1);
+ }
+
+ numerators = numerators == null
+ ? prefValue == 1.0f ? simColumn.clone() : simColumn.times(prefValue)
+ : numerators.plus(prefValue == 1.0f ? simColumn : simColumn.times(prefValue));
+
+ simColumn.assign(ABSOLUTE_VALUES);
+ denominators = denominators == null ? simColumn : denominators.plus(simColumn);
+ }
+
+ if (numerators == null) {
return;
}
+ Vector recommendationVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ Iterator<Element> iterator = numerators.iterateNonZero();
+ while (iterator.hasNext()) {
+ Element element = iterator.next();
+ int itemIDIndex = element.index();
+ /* preference estimations must be based on at least 2 datapoints */
+ if (numberOfSimilarItemsUsed.getQuick(itemIDIndex) > 1) {
+ /* compute normalized prediction */
+ double prediction = element.get() / denominators.getQuick(itemIDIndex);
+ recommendationVector.setQuick(itemIDIndex, prediction);
+ }
+ }
+
Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1,
Collections.reverseOrder(ByValueRecommendedItemComparator.getInstance()));
- Iterator<Vector.Element> recommendationVectorIterator =
- recommendationVector.iterateNonZero();
+ Iterator<Vector.Element> recommendationVectorIterator = recommendationVector.iterateNonZero();
while (recommendationVectorIterator.hasNext()) {
Vector.Element element = recommendationVectorIterator.next();
int index = element.index();
-
- long itemId = indexItemIDMap.get(index);
- if (itemsToRecommendFor == null || itemsToRecommendFor.contains(itemId)) {
+
+ long itemID = indexItemIDMap.get(index);
+ if (itemsToRecommendFor == null || itemsToRecommendFor.contains(itemID)) {
float value = (float) element.get();
if (!Float.isNaN(value)) {
if (topItems.size() < recommendationsPerUser) {
- topItems.add(new GenericRecommendedItem(itemId, value));
+ topItems.add(new GenericRecommendedItem(itemID, value));
} else if (value > topItems.peek().getValue()) {
- topItems.add(new GenericRecommendedItem(itemId, value));
+ topItems.add(new GenericRecommendedItem(itemID, value));
topItems.poll();
}
}
@@ -148,7 +231,7 @@ public final class AggregateAndRecommend
List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>(topItems.size());
recommendations.addAll(topItems);
Collections.sort(recommendations, ByValueRecommendedItemComparator.getInstance());
- context.write(key, new RecommendedItemsWritable(recommendations));
+ context.write(userID, new RecommendedItemsWritable(recommendations));
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java Thu Jul 8 19:19:53 2010
@@ -18,20 +18,18 @@
package org.apache.mahout.cf.taste.hadoop.item;
import java.io.IOException;
-import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
public final class ItemIDIndexMapper extends
Mapper<LongWritable,Text, VarIntWritable, VarLongWritable> {
-
- private static final Pattern COMMA = Pattern.compile(",");
private boolean transpose;
@@ -45,14 +43,9 @@ public final class ItemIDIndexMapper ext
protected void map(LongWritable key,
Text value,
Context context) throws IOException, InterruptedException {
- String[] tokens = ItemIDIndexMapper.COMMA.split(value.toString());
+ String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);
- int index = idToIndex(itemID);
+ int index = TasteHadoopUtils.idToIndex(itemID);
context.write(new VarIntWritable(index), new VarLongWritable(itemID));
- }
-
- static int idToIndex(long itemID) {
- return 0x7FFFFFFF & ((int) itemID ^ (int) (itemID >>> 32));
- }
-
+ }
}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/MaybePruneRowsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/MaybePruneRowsMapper.java?rev=961888&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/MaybePruneRowsMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/MaybePruneRowsMapper.java Thu Jul 8 19:19:53 2010
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
+
+public class MaybePruneRowsMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ private int maxSimilaritiesPerItemConsidered;
+ private final OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ super.setup(ctx);
+ maxSimilaritiesPerItemConsidered =
+ ctx.getConfiguration().getInt(RecommenderJob.MAX_SIMILARITIES_PER_ITEM_CONSIDERED, -1);
+ if (maxSimilaritiesPerItemConsidered < 1) {
+ throw new IllegalStateException("Maximum number of similarities per item was not correctly set!");
+ }
+ }
+
+ @Override
+ protected void map(IntWritable rowIndex, VectorWritable vectorWritable, Context ctx)
+ throws IOException, InterruptedException {
+ Vector vector = vectorWritable.get();
+ countSeen(vector);
+ vector = maybePruneVector(vector);
+ vectorWritable.set(vector);
+ vectorWritable.setWritesLaxPrecision(true);
+ ctx.write(rowIndex, vectorWritable);
+ }
+
+ private void countSeen(Vector vector) {
+ Iterator<Vector.Element> it = vector.iterateNonZero();
+ while (it.hasNext()) {
+ int index = it.next().index();
+ indexCounts.adjustOrPutValue(index, 1, 1);
+ }
+ }
+
+ private Vector maybePruneVector(Vector vector) {
+ if (vector.getNumNondefaultElements() <= maxSimilaritiesPerItemConsidered) {
+ return vector;
+ }
+
+ PriorityQueue<Integer> smallCounts =
+ new PriorityQueue<Integer>(maxSimilaritiesPerItemConsidered + 1, Collections.reverseOrder());
+ Iterator<Vector.Element> it = vector.iterateNonZero();
+ while (it.hasNext()) {
+ int count = indexCounts.get(it.next().index());
+ if (count > 0) {
+ if (smallCounts.size() < maxSimilaritiesPerItemConsidered) {
+ smallCounts.add(count);
+ } else if (count < smallCounts.peek()) {
+ smallCounts.add(count);
+ smallCounts.poll();
+ }
+ }
+ }
+
+ int greatestSmallCount = smallCounts.peek();
+ if (greatestSmallCount > 0) {
+ Iterator<Vector.Element> it2 = vector.iterateNonZero();
+ while (it2.hasNext()) {
+ Vector.Element e = it2.next();
+ if (indexCounts.get(e.index()) > greatestSmallCount) {
+ e.set(0.0);
+ }
+ }
+ }
+ return vector;
+ }
+ }
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java Thu Jul 8 19:19:53 2010
@@ -21,53 +21,37 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
-import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.Vector;
+/**
+ * maps similar items and their preference values per user
+ */
public final class PartialMultiplyMapper extends
- Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,VectorWritable> {
+ Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,PrefAndSimilarityColumnWritable> {
@Override
protected void map(VarIntWritable key,
VectorAndPrefsWritable vectorAndPrefsWritable,
Context context) throws IOException, InterruptedException {
- int itemIndex = key.get();
-
- Vector cooccurrenceColumn = vectorAndPrefsWritable.getVector();
+ Vector similarityMatrixColumn = vectorAndPrefsWritable.getVector();
List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
List<Float> prefValues = vectorAndPrefsWritable.getValues();
VarLongWritable userIDWritable = new VarLongWritable();
-
- // These single-element vectors ensure that each user will not be recommended
- // this item
- Vector excludeVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
- excludeVector.set(itemIndex, Double.NaN);
- VectorWritable excludeWritable = new VectorWritable(excludeVector);
- excludeWritable.setWritesLaxPrecision(true);
- for (long userID : userIDs) {
- userIDWritable.set(userID);
- context.write(userIDWritable, excludeWritable);
- }
-
- VectorWritable vectorWritable = new VectorWritable();
- vectorWritable.setWritesLaxPrecision(true);
+ PrefAndSimilarityColumnWritable prefAndSimilarityColumn = new PrefAndSimilarityColumnWritable();
for (int i = 0; i < userIDs.size(); i++) {
long userID = userIDs.get(i);
float prefValue = prefValues.get(i);
if (!Float.isNaN(prefValue)) {
- Vector partialProduct = prefValue == 1.0f ? cooccurrenceColumn : cooccurrenceColumn.times(prefValue);
+ prefAndSimilarityColumn.set(prefValue, similarityMatrixColumn);
userIDWritable.set(userID);
- vectorWritable.set(partialProduct);
- context.write(userIDWritable, vectorWritable);
+ context.write(userIDWritable, prefAndSimilarityColumn);
}
}
-
}
}
\ No newline at end of file
Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java?rev=961888&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java Thu Jul 8 19:19:53 2010
@@ -0,0 +1,70 @@
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public class PrefAndSimilarityColumnWritable implements Writable {
+
+ private float prefValue;
+ private Vector similarityColumn;
+
+ public PrefAndSimilarityColumnWritable() {
+ super();
+ }
+
+ public PrefAndSimilarityColumnWritable(float prefValue, Vector similarityColumn) {
+ super();
+ set(prefValue, similarityColumn);
+ }
+
+ public void set(float prefValue, Vector similarityColumn) {
+ this.prefValue = prefValue;
+ this.similarityColumn = similarityColumn;
+ }
+
+ public float getPrefValue() {
+ return prefValue;
+ }
+
+ public Vector getSimilarityColumn() {
+ return similarityColumn;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ prefValue = in.readFloat();
+ VectorWritable vw = new VectorWritable();
+ vw.readFields(in);
+ similarityColumn = vw.get();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeFloat(prefValue);
+ VectorWritable vw = new VectorWritable(similarityColumn);
+ vw.setWritesLaxPrecision(true);
+ vw.write(out);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof PrefAndSimilarityColumnWritable) {
+ PrefAndSimilarityColumnWritable other = (PrefAndSimilarityColumnWritable) obj;
+ return prefValue == other.prefValue && similarityColumn.equals(other.similarityColumn);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return RandomUtils.hashFloat(prefValue) + 31 * similarityColumn.hashCode();
+ }
+
+
+}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Thu Jul 8 19:19:53 2010
@@ -25,6 +25,8 @@ import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
@@ -36,29 +38,38 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersKeyWritable;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersMapper;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersReducer;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.PrefsToItemUserMatrixMapper;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.PrefsToItemUserMatrixReducer;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob;
/**
* <p>Runs a completely distributed recommender job as a series of mapreduces.</p>
- *
+ *
* <p>Command line arguments specific to this class are:</p>
- *
+ *
* <ol>
* <li>-Dmapred.input.dir=(path): Directory containing a text file containing user IDs
* for which recommendations should be computed, one per line</li>
* <li>-Dmapred.output.dir=(path): output path where recommender output should go</li>
+ * <li>--similarityClassname (classname): Name of distributed similarity class to instantiate</li>
* <li>--usersFile (path): file containing user IDs to recommend for (optional)</li>
* <li>--itemsFile (path): file containing item IDs to recommend for (optional)</li>
* <li>--numRecommendations (integer): Number of recommendations to compute per user (optional; default 10)</li>
* <li>--booleanData (boolean): Treat input data as having to pref values (false)</li>
* <li>--maxPrefsPerUserConsidered (integer): Maximum number of preferences considered per user in
* final recommendation phase (10)</li>
- * <li>--maxCooccurrencesPerItemConsidered: Maximum number of cooccurrences considered per item
- * in count phase (100)</li>
+ * <li>--maxSimilaritiesPerItemConsidered (integer): Maximum number of similarities considered per item (optional;
+ * default 100)</li>
* </ol>
*
* <p>General command line options are documented in {@link AbstractJob}.</p>
@@ -68,8 +79,12 @@ import org.apache.mahout.math.VectorWrit
*/
public final class RecommenderJob extends AbstractJob {
+ static final String MAX_SIMILARITIES_PER_ITEM_CONSIDERED = RecommenderJob.class.getName() +
+ ".maxSimilaritiesPerItemConsidered";
+
public static final String BOOLEAN_DATA = "booleanData";
-
+ public static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM_CONSIDERED = 100;
+
@Override
public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
@@ -83,15 +98,16 @@ public final class RecommenderJob extend
addOption("maxPrefsPerUserConsidered", null,
"Maximum number of preferences considered per user in final recommendation phase",
String.valueOf(UserVectorSplitterMapper.DEFAULT_MAX_PREFS_PER_USER_CONSIDERED));
- addOption("maxCooccurrencesPerItemConsidered", null,
- "Maximum number of cooccurrences considered per item in count phase",
- String.valueOf(UserVectorToCooccurrenceMapper.DEFAULT_MAX_COOCCURRENCES_PER_ITEM_CONSIDERED));
+ addOption("maxSimilaritiesPerItemConsidered", null,
+ "Maximum number of similarities considered per item ",
+ String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ITEM_CONSIDERED));
+ addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate");
Map<String,String> parsedArgs = parseArguments(args);
if (parsedArgs == null) {
return -1;
}
-
+
Path inputPath = getInputPath();
Path outputPath = getOutputPath();
Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
@@ -100,11 +116,15 @@ public final class RecommenderJob extend
String itemsFile = parsedArgs.get("--itemsFile");
boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
int maxPrefsPerUserConsidered = Integer.parseInt(parsedArgs.get("--maxPrefsPerUserConsidered"));
- int maxCooccurrencesPerItemConsidered = Integer.parseInt(parsedArgs.get("--maxCooccurrencesPerItemConsidered"));
+ int maxSimilaritiesPerItemConsidered = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItemConsidered"));
+ String similarityClassname = parsedArgs.get("--similarityClassname");
Path userVectorPath = new Path(tempDirPath, "userVectors");
Path itemIDIndexPath = new Path(tempDirPath, "itemIDIndex");
- Path cooccurrencePath = new Path(tempDirPath, "cooccurrence");
+ Path countUsersPath = new Path(tempDirPath, "countUsers");
+ Path itemUserMatrixPath = new Path(tempDirPath, "itemUserMatrix");
+ Path maybePruneItemUserMatrixPath = new Path(tempDirPath, "maybePruneItemUserMatrixPath");
+ Path similarityMatrixPath = new Path(tempDirPath, "similarityMatrix");
Path prePartialMultiplyPath1 = new Path(tempDirPath, "prePartialMultiply1");
Path prePartialMultiplyPath2 = new Path(tempDirPath, "prePartialMultiply2");
Path partialMultiplyPath = new Path(tempDirPath, "partialMultiply");
@@ -132,21 +152,70 @@ public final class RecommenderJob extend
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job toCooccurrence = prepareJob(
- userVectorPath, cooccurrencePath, SequenceFileInputFormat.class,
- UserVectorToCooccurrenceMapper.class, VarIntWritable.class, VarIntWritable.class,
- UserVectorToCooccurrenceReducer.class, VarIntWritable.class, VectorWritable.class,
- SequenceFileOutputFormat.class);
- setIOSort(toCooccurrence);
- toCooccurrence.getConfiguration().setInt(UserVectorToCooccurrenceMapper.MAX_COOCCURRENCES_PER_ITEM_CONSIDERED,
- maxCooccurrencesPerItemConsidered);
- toCooccurrence.waitForCompletion(true);
+ Job countUsers = prepareJob(inputPath,
+ countUsersPath,
+ TextInputFormat.class,
+ CountUsersMapper.class,
+ CountUsersKeyWritable.class,
+ VarLongWritable.class,
+ CountUsersReducer.class,
+ VarIntWritable.class,
+ NullWritable.class,
+ TextOutputFormat.class);
+ countUsers.setPartitionerClass(CountUsersKeyWritable.CountUsersPartitioner.class);
+ countUsers.setGroupingComparatorClass(CountUsersKeyWritable.CountUsersGroupComparator.class);
+ countUsers.waitForCompletion(true);
+ }
+
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ Job itemUserMatrix = prepareJob(inputPath,
+ itemUserMatrixPath,
+ TextInputFormat.class,
+ PrefsToItemUserMatrixMapper.class,
+ VarIntWritable.class,
+ DistributedRowMatrix.MatrixEntryWritable.class,
+ PrefsToItemUserMatrixReducer.class,
+ IntWritable.class,
+ VectorWritable.class,
+ SequenceFileOutputFormat.class);
+ itemUserMatrix.waitForCompletion(true);
+ }
+
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ Job maybePruneItemUserMatrix = prepareJob(itemUserMatrixPath,
+ maybePruneItemUserMatrixPath,
+ SequenceFileInputFormat.class,
+ MaybePruneRowsMapper.class,
+ IntWritable.class,
+ VectorWritable.class,
+ Reducer.class,
+ IntWritable.class,
+ VectorWritable.class,
+ SequenceFileOutputFormat.class);
+ maybePruneItemUserMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ITEM_CONSIDERED,
+ maxSimilaritiesPerItemConsidered);
+ maybePruneItemUserMatrix.waitForCompletion(true);
+ }
+
+ int numberOfUsers = TasteHadoopUtils.readIntFromFile(getConf(), countUsersPath);
+
+ if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+ /* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like
+ * new DistributedRowMatrix(...).rowSimilarity(...) */
+ try {
+ RowSimilarityJob.main(new String[] { "-Dmapred.input.dir=" + maybePruneItemUserMatrixPath.toString(),
+ "-Dmapred.output.dir=" + similarityMatrixPath.toString(), "--numberOfColumns",
+ String.valueOf(numberOfUsers), "--similarityClassname", similarityClassname, "--maxSimilaritiesPerRow",
+ String.valueOf(maxSimilaritiesPerItemConsidered + 1), "--tempDir", tempDirPath.toString() });
+ } catch (Exception e) {
+ throw new IllegalStateException("item-item-similarity computation failed", e);
+ }
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job prePartialMultiply1 = prepareJob(
- cooccurrencePath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
- CooccurrenceColumnWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
+ similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
+ SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
SequenceFileOutputFormat.class);
prePartialMultiply1.waitForCompletion(true);
@@ -173,20 +242,19 @@ public final class RecommenderJob extend
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-
Job aggregateAndRecommend = prepareJob(
partialMultiplyPath, outputPath, SequenceFileInputFormat.class,
- PartialMultiplyMapper.class, VarLongWritable.class, VectorWritable.class,
+ PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
TextOutputFormat.class);
Configuration jobConf = aggregateAndRecommend.getConfiguration();
if (itemsFile != null) {
jobConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
- }
+ }
setIOSort(aggregateAndRecommend);
- aggregateAndRecommend.setCombinerClass(AggregateCombiner.class);
jobConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath.toString());
jobConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
+ jobConf.setBoolean(BOOLEAN_DATA, booleanData);
aggregateAndRecommend.waitForCompletion(true);
}
@@ -213,9 +281,8 @@ public final class RecommenderJob extend
// timeout when running these jobs
conf.setInt("mapred.task.timeout", 60 * 60 * 1000);
}
-
+
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new RecommenderJob(), args);
}
-
}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java?rev=961888&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java Thu Jul 8 19:19:53 2010
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * maps a row of the similarity matrix to a {@link VectorOrPrefWritable}
+ *
+ * actually a column from that matrix has to be used but as the similarity matrix is symmetric,
+ * we can use a row instead of having to transpose it
+ */
+public final class SimilarityMatrixRowWrapperMapper extends
+ Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> {
+
+ @Override
+ protected void map(IntWritable key,
+ VectorWritable value,
+ Context context) throws IOException, InterruptedException {
+ Vector similarityMatrixRow = value.get();
+ /* remove self similarity */
+ similarityMatrixRow.set(key.get(), Double.NaN);
+ context.write(new VarIntWritable(key.get()), new VectorOrPrefWritable(similarityMatrixRow));
+ }
+
+}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java Thu Jul 8 19:19:53 2010
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
@@ -52,7 +53,7 @@ public final class ToUserVectorReducer e
Context context) throws IOException, InterruptedException {
Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
for (VarLongWritable itemPref : itemPrefs) {
- int index = ItemIDIndexMapper.idToIndex(itemPref.get());
+ int index = TasteHadoopUtils.idToIndex(itemPref.get());
float value = itemPref instanceof EntityPrefWritable ? ((EntityPrefWritable) itemPref).getPrefValue() : 1.0f;
userVector.set(index, value);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java Thu Jul 8 19:19:53 2010
@@ -35,7 +35,7 @@ public final class ToVectorAndPrefReduce
List<Long> userIDs = new ArrayList<Long>();
List<Float> prefValues = new ArrayList<Float>();
- Vector cooccurrenceColumn = null;
+ Vector similarityMatrixColumn = null;
for (VectorOrPrefWritable value : values) {
if (value.getVector() == null) {
// Then this is a user-pref value
@@ -43,19 +43,19 @@ public final class ToVectorAndPrefReduce
prefValues.add(value.getValue());
} else {
// Then this is the column vector
- if (cooccurrenceColumn != null) {
- throw new IllegalStateException("Found two co-occurrence columns for item index " + key.get());
+ if (similarityMatrixColumn != null) {
+ throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get());
}
- cooccurrenceColumn = value.getVector();
+ similarityMatrixColumn = value.getVector();
}
}
- if (cooccurrenceColumn == null) {
+ if (similarityMatrixColumn == null) {
return;
}
- VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(cooccurrenceColumn, userIDs, prefValues);
+ VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityMatrixColumn, userIDs, prefValues);
context.write(key, vectorAndPrefs);
}
-}
\ No newline at end of file
+}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PrefsToItemUserMatrixReducer.java Thu Jul 8 19:19:53 2010
@@ -25,7 +25,6 @@ import org.apache.mahout.math.RandomAcce
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.DistributedRowMatrix;
import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
/**
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/RowSimilarityJob.java Thu Jul 8 19:19:53 2010
@@ -86,7 +86,7 @@ public class RowSimilarityJob extends Ab
}
@Override
- public int run(String[] args) throws Exception {
+ public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
addInputOption();
addOutputOption();
@@ -341,4 +341,4 @@ public class RowSimilarityJob extends Ab
}
}
-}
+}
\ No newline at end of file
Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedCooccurrenceVectorSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedCooccurrenceVectorSimilarity.java?rev=961888&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedCooccurrenceVectorSimilarity.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/vector/DistributedCooccurrenceVectorSimilarity.java Thu Jul 8 19:19:53 2010
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity.vector;
+
+import org.apache.mahout.math.hadoop.similarity.Cooccurrence;
+
+/**
+ * uses the co-occcurence count as vector similarity
+ */
+public class DistributedCooccurrenceVectorSimilarity extends AbstractDistributedVectorSimilarity {
+
+ @Override
+ protected double doComputeResult(int rowA, int rowB, Iterable<Cooccurrence> cooccurrences, double weightOfVectorA,
+ double weightOfVectorB, int numberOfColumns) {
+ return countElements(cooccurrences);
+ }
+
+}
Added: mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java?rev=961888&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java Thu Jul 8 19:19:53 2010
@@ -0,0 +1,740 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
+import org.apache.mahout.cf.taste.impl.TasteTestCase;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
+import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.MathHelper;
+import org.apache.mahout.math.hadoop.similarity.vector.DistributedTanimotoCoefficientVectorSimilarity;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
+import org.easymock.IArgumentMatcher;
+import org.easymock.classextension.EasyMock;
+
+public class RecommenderJobTest extends TasteTestCase {
+
+ /**
+ * tests {@link ItemIDIndexMapper}
+ *
+ * @throws Exception
+ */
+ public void testItemIDIndexMapper() throws Exception {
+ Mapper<LongWritable,Text, VarIntWritable, VarLongWritable>.Context context =
+ EasyMock.createMock(Mapper.Context.class);
+
+ context.write(new VarIntWritable(TasteHadoopUtils.idToIndex(789L)), new VarLongWritable(789L));
+ EasyMock.replay(context);
+
+ new ItemIDIndexMapper().map(new LongWritable(123L), new Text("456,789,5.0"), context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * tests {@link ItemIDIndexReducer}
+ *
+ * @throws Exception
+ */
+ public void testItemIDIndexReducer() throws Exception {
+ Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable>.Context context =
+ EasyMock.createMock(Reducer.Context.class);
+
+ context.write(new VarIntWritable(123), new VarLongWritable(45L));
+ EasyMock.replay(context);
+
+ new ItemIDIndexReducer().reduce(new VarIntWritable(123), Arrays.asList(new VarLongWritable(67L),
+ new VarLongWritable(89L), new VarLongWritable(45L)), context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * tests {@link ToItemPrefsMapper}
+ *
+ * @throws Exception
+ */
+ public void testToItemPrefsMapper() throws Exception {
+ Mapper<LongWritable,Text, VarLongWritable,VarLongWritable>.Context context =
+ EasyMock.createMock(Mapper.Context.class);
+
+ context.write(new VarLongWritable(12L), new EntityPrefWritable(34L, 1f));
+ context.write(new VarLongWritable(56L), new EntityPrefWritable(78L, 2f));
+ EasyMock.replay(context);
+
+ ToItemPrefsMapper mapper = new ToItemPrefsMapper();
+ mapper.map(new LongWritable(123L), new Text("12,34,1"), context);
+ mapper.map(new LongWritable(456L), new Text("56,78,2"), context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * tests {@link ToItemPrefsMapper} using boolean data
+ *
+ * @throws Exception
+ */
+ public void testToItemPrefsMapperBooleanData() throws Exception {
+ Mapper<LongWritable,Text, VarLongWritable,VarLongWritable>.Context context =
+ EasyMock.createMock(Mapper.Context.class);
+
+ context.write(new VarLongWritable(12L), new VarLongWritable(34L));
+ context.write(new VarLongWritable(56L), new VarLongWritable(78L));
+ EasyMock.replay(context);
+
+ ToItemPrefsMapper mapper = new ToItemPrefsMapper();
+ setField(mapper, "booleanData", true);
+ mapper.map(new LongWritable(123L), new Text("12,34"), context);
+ mapper.map(new LongWritable(456L), new Text("56,78"), context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * tests {@link ToUserVectorReducer}
+ *
+ * @throws Exception
+ */
+ public void testToUserVectorReducer() throws Exception {
+ Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable>.Context context =
+ EasyMock.createMock(Reducer.Context.class);
+
+ context.write(EasyMock.eq(new VarLongWritable(12L)), MathHelper.vectorMatches(
+ MathHelper.elem(TasteHadoopUtils.idToIndex(34L), 1d), MathHelper.elem(TasteHadoopUtils.idToIndex(56L), 2d)));
+
+ EasyMock.replay(context);
+
+ List<VarLongWritable> varLongWritables = new LinkedList<VarLongWritable>();
+ varLongWritables.add(new EntityPrefWritable(34L, 1f));
+ varLongWritables.add(new EntityPrefWritable(56L, 2f));
+
+ new ToUserVectorReducer().reduce(new VarLongWritable(12L), varLongWritables, context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * tests {@link ToUserVectorReducer} using boolean data
+ *
+ * @throws Exception
+ */
+ public void testToUserVectorReducerWithBooleanData() throws Exception {
+ Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable>.Context context =
+ EasyMock.createMock(Reducer.Context.class);
+
+ context.write(EasyMock.eq(new VarLongWritable(12L)), MathHelper.vectorMatches(
+ MathHelper.elem(TasteHadoopUtils.idToIndex(34L), 1d), MathHelper.elem(TasteHadoopUtils.idToIndex(56L), 1d)));
+
+ EasyMock.replay(context);
+
+ new ToUserVectorReducer().reduce(new VarLongWritable(12L), Arrays.asList(new VarLongWritable(34L),
+ new VarLongWritable(56L)), context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * tests {@link SimilarityMatrixRowWrapperMapper}
+ *
+ * @throws Exception
+ */
+ public void testSimilarityMatrixRowWrapperMapper() throws Exception {
+ Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable>.Context context =
+ EasyMock.createMock(Mapper.Context.class);
+
+ context.write(EasyMock.eq(new VarIntWritable(12)), vectorOfVectorOrPrefWritableMatches(MathHelper.elem(34, 0.5d),
+ MathHelper.elem(56, 0.7d)));
+
+ EasyMock.replay(context);
+
+ RandomAccessSparseVector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ vector.set(12, 1d);
+ vector.set(34, 0.5d);
+ vector.set(56, 0.7d);
+
+ new SimilarityMatrixRowWrapperMapper().map(new IntWritable(12), new VectorWritable(vector), context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * verifies the {@link Vector} included in a {@link VectorOrPrefWritable}
+ *
+ * @param elements
+ * @return
+ */
+ public static VectorOrPrefWritable vectorOfVectorOrPrefWritableMatches(final Vector.Element... elements) {
+ EasyMock.reportMatcher(new IArgumentMatcher() {
+ @Override
+ public boolean matches(Object argument) {
+ if (argument instanceof VectorOrPrefWritable) {
+ Vector v = ((VectorOrPrefWritable) argument).getVector();
+ return MathHelper.consistsOf(v, elements);
+ }
+ return false;
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer) {}
+ });
+ return null;
+ }
+
+ /**
+ * tests {@link UserVectorSplitterMapper}
+ *
+ * @throws Exception
+ */
+ public void testUserVectorSplitterMapper() throws Exception {
+ Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable>.Context context =
+ EasyMock.createMock(Mapper.Context.class);
+
+ context.write(EasyMock.eq(new VarIntWritable(34)), prefOfVectorOrPrefWritableMatches(123L, 0.5f));
+ context.write(EasyMock.eq(new VarIntWritable(56)), prefOfVectorOrPrefWritableMatches(123L, 0.7f));
+
+ EasyMock.replay(context);
+
+ UserVectorSplitterMapper mapper = new UserVectorSplitterMapper();
+ setField(mapper, "maxPrefsPerUserConsidered", 10);
+
+ RandomAccessSparseVector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ vector.set(34, 0.5d);
+ vector.set(56, 0.7d);
+
+ mapper.map(new VarLongWritable(123L), new VectorWritable(vector), context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * verifies a preference in a {@link VectorOrPrefWritable}
+ *
+ * @param userID
+ * @param prefValue
+ * @return
+ */
+ public static VectorOrPrefWritable prefOfVectorOrPrefWritableMatches(final long userID, final float prefValue) {
+ EasyMock.reportMatcher(new IArgumentMatcher() {
+ @Override
+ public boolean matches(Object argument) {
+ if (argument instanceof VectorOrPrefWritable) {
+ VectorOrPrefWritable pref = ((VectorOrPrefWritable) argument);
+ return pref.getUserID() == userID && pref.getValue() == prefValue;
+ }
+ return false;
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer) {}
+ });
+ return null;
+ }
+
+ /**
+ * tests {@link UserVectorSplitterMapper} in the special case that some userIDs shall be excluded
+ *
+ * @throws Exception
+ */
+ public void testUserVectorSplitterMapperUserExclusion() throws Exception {
+ Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable>.Context context =
+ EasyMock.createMock(Mapper.Context.class);
+
+ context.write(EasyMock.eq(new VarIntWritable(34)), prefOfVectorOrPrefWritableMatches(123L, 0.5f));
+ context.write(EasyMock.eq(new VarIntWritable(56)), prefOfVectorOrPrefWritableMatches(123L, 0.7f));
+
+ EasyMock.replay(context);
+
+ FastIDSet usersToRecommendFor = new FastIDSet();
+ usersToRecommendFor.add(123L);
+
+ UserVectorSplitterMapper mapper = new UserVectorSplitterMapper();
+ setField(mapper, "maxPrefsPerUserConsidered", 10);
+ setField(mapper, "usersToRecommendFor", usersToRecommendFor);
+
+
+ RandomAccessSparseVector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ vector.set(34, 0.5d);
+ vector.set(56, 0.7d);
+
+ mapper.map(new VarLongWritable(123L), new VectorWritable(vector), context);
+ mapper.map(new VarLongWritable(456L), new VectorWritable(vector), context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * tests {@link UserVectorSplitterMapper} in the special case that the number of preferences to be considered
+ * is less than the number of available preferences
+ *
+ * @throws Exception
+ */
+ public void testUserVectorSplitterMapperOnlySomePrefsConsidered() throws Exception {
+ Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable>.Context context =
+ EasyMock.createMock(Mapper.Context.class);
+
+ context.write(EasyMock.eq(new VarIntWritable(34)), prefOfVectorOrPrefWritableMatchesNaN(123L));
+ context.write(EasyMock.eq(new VarIntWritable(56)), prefOfVectorOrPrefWritableMatches(123L, 0.7f));
+
+ EasyMock.replay(context);
+
+ UserVectorSplitterMapper mapper = new UserVectorSplitterMapper();
+ setField(mapper, "maxPrefsPerUserConsidered", 1);
+
+ RandomAccessSparseVector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ vector.set(34, 0.5d);
+ vector.set(56, 0.7d);
+
+ mapper.map(new VarLongWritable(123L), new VectorWritable(vector), context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * verifies that a preference value is NaN in a {@link VectorOrPrefWritable}
+ *
+ * @param userID
+ * @return
+ */
+ public static VectorOrPrefWritable prefOfVectorOrPrefWritableMatchesNaN(final long userID) {
+ EasyMock.reportMatcher(new IArgumentMatcher() {
+ @Override
+ public boolean matches(Object argument) {
+ if (argument instanceof VectorOrPrefWritable) {
+ VectorOrPrefWritable pref = ((VectorOrPrefWritable) argument);
+ return pref.getUserID() == userID && Float.isNaN(pref.getValue());
+ }
+ return false;
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer) {}
+ });
+ return null;
+ }
+
+ /**
+ * tests {@link ToVectorAndPrefReducer}
+ *
+ * @throws Exception
+ */
+ public void testToVectorAndPrefReducer() throws Exception {
+ Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable>.Context context =
+ EasyMock.createMock(Reducer.Context.class);
+
+ context.write(EasyMock.eq(new VarIntWritable(1)), vectorAndPrefsWritableMatches(Arrays.asList(123L, 456L),
+ Arrays.asList(1f, 2f), MathHelper.elem(3, 0.5d), MathHelper.elem(7, 0.8d)));
+
+ EasyMock.replay(context);
+
+ Vector similarityColumn = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ similarityColumn.set(3, 0.5d);
+ similarityColumn.set(7, 0.8d);
+
+ VectorOrPrefWritable itemPref1 = new VectorOrPrefWritable(123L, 1f);
+ VectorOrPrefWritable itemPref2 = new VectorOrPrefWritable(456L, 2f);
+ VectorOrPrefWritable similarities = new VectorOrPrefWritable(similarityColumn);
+
+ new ToVectorAndPrefReducer().reduce(new VarIntWritable(1), Arrays.asList(itemPref1, itemPref2, similarities),
+ context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * verifies a {@link VectorAndPrefsWritable}
+ *
+ * @param userIDs
+ * @param prefValues
+ * @param elements
+ * @return
+ */
+ public static VectorAndPrefsWritable vectorAndPrefsWritableMatches(final List<Long> userIDs,
+ final List<Float> prefValues, final Vector.Element... elements) {
+ EasyMock.reportMatcher(new IArgumentMatcher() {
+ @Override
+ public boolean matches(Object argument) {
+ if (argument instanceof VectorAndPrefsWritable) {
+ VectorAndPrefsWritable vectorAndPrefs = ((VectorAndPrefsWritable) argument);
+
+ if (!vectorAndPrefs.getUserIDs().equals(userIDs)) {
+ return false;
+ }
+ if (!vectorAndPrefs.getValues().equals(prefValues)) {
+ return false;
+ }
+ return MathHelper.consistsOf(vectorAndPrefs.getVector(), elements);
+ }
+ return false;
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer) {}
+ });
+ return null;
+ }
+
+ /**
+ * tests {@link ToVectorAndPrefReducer} in the error case that two similarity column vectors a supplied for the same
+ * item (which should never happen)
+ *
+ * @throws Exception
+ */
+ public void testToVectorAndPrefReducerExceptionOn2Vectors() throws Exception {
+ Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable>.Context context =
+ EasyMock.createMock(Reducer.Context.class);
+
+ EasyMock.replay(context);
+
+ Vector similarityColumn1 = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ Vector similarityColumn2 = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+
+ VectorOrPrefWritable similarities1 = new VectorOrPrefWritable(similarityColumn1);
+ VectorOrPrefWritable similarities2 = new VectorOrPrefWritable(similarityColumn2);
+
+ try {
+ new ToVectorAndPrefReducer().reduce(new VarIntWritable(1), Arrays.asList(similarities1, similarities2), context);
+ fail();
+ } catch (IllegalStateException e) {}
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * tests {@link PartialMultiplyMapper}
+ *
+ * @throws Exception
+ */
+ public void testPartialMultiplyMapper() throws Exception {
+
+ Vector similarityColumn = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ similarityColumn.set(3, 0.5d);
+ similarityColumn.set(7, 0.8d);
+
+ Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,PrefAndSimilarityColumnWritable>.Context context =
+ EasyMock.createMock(Mapper.Context.class);
+
+ PrefAndSimilarityColumnWritable one = new PrefAndSimilarityColumnWritable();
+ PrefAndSimilarityColumnWritable two = new PrefAndSimilarityColumnWritable();
+ one.set(1f, similarityColumn);
+ two.set(3f, similarityColumn);
+
+ context.write(EasyMock.eq(new VarLongWritable(123L)), EasyMock.eq(one));
+ context.write(EasyMock.eq(new VarLongWritable(456L)), EasyMock.eq(two));
+
+ EasyMock.replay(context);
+
+ VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityColumn, Arrays.asList(123L, 456L),
+ Arrays.asList(1f, 3f));
+
+ new PartialMultiplyMapper().map(new VarIntWritable(1), vectorAndPrefs, context);
+
+ EasyMock.verify(context);
+ }
+
+
+ /**
+ * tests {@link AggregateAndRecommendReducer}
+ *
+ * @throws Exception
+ */
+ public void testAggregateAndRecommendReducer() throws Exception {
+ Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable>.Context context =
+ EasyMock.createMock(Reducer.Context.class);
+
+ context.write(EasyMock.eq(new VarLongWritable(123L)), recommendationsMatch(new GenericRecommendedItem(1L, 2.8f),
+ new GenericRecommendedItem(2L, 2f)));
+
+ EasyMock.replay(context);
+
+ RandomAccessSparseVector similarityColumnOne = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ similarityColumnOne.set(1, 0.1d);
+ similarityColumnOne.set(2, 0.5d);
+
+ RandomAccessSparseVector similarityColumnTwo = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ similarityColumnTwo.set(1, 0.9d);
+ similarityColumnTwo.set(2, 0.5d);
+
+ List<PrefAndSimilarityColumnWritable> values = Arrays.asList(
+ new PrefAndSimilarityColumnWritable(1f, similarityColumnOne),
+ new PrefAndSimilarityColumnWritable(3f, similarityColumnTwo));
+
+ OpenIntLongHashMap indexItemIDMap = new OpenIntLongHashMap();
+ indexItemIDMap.put(1, 1L);
+ indexItemIDMap.put(2, 2L);
+
+ AggregateAndRecommendReducer reducer = new AggregateAndRecommendReducer();
+
+ setField(reducer, "indexItemIDMap", indexItemIDMap);
+ setField(reducer, "recommendationsPerUser", 3);
+
+ reducer.reduce(new VarLongWritable(123L), values, context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * tests {@link AggregateAndRecommendReducer}
+ *
+ * @throws Exception
+ */
+ public void testAggregateAndRecommendReducerExcludeRecommendationsBasedOnOneItem() throws Exception {
+ Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable>.Context context =
+ EasyMock.createMock(Reducer.Context.class);
+
+ context.write(EasyMock.eq(new VarLongWritable(123L)), recommendationsMatch(new GenericRecommendedItem(1L, 2.8f)));
+
+ EasyMock.replay(context);
+
+ RandomAccessSparseVector similarityColumnOne = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ similarityColumnOne.set(1, 0.1d);
+
+ RandomAccessSparseVector similarityColumnTwo = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ similarityColumnTwo.set(1, 0.9d);
+ similarityColumnTwo.set(2, 0.5d);
+
+ List<PrefAndSimilarityColumnWritable> values = Arrays.asList(
+ new PrefAndSimilarityColumnWritable(1f, similarityColumnOne),
+ new PrefAndSimilarityColumnWritable(3f, similarityColumnTwo));
+
+ OpenIntLongHashMap indexItemIDMap = new OpenIntLongHashMap();
+ indexItemIDMap.put(1, 1L);
+ indexItemIDMap.put(2, 2L);
+
+ AggregateAndRecommendReducer reducer = new AggregateAndRecommendReducer();
+
+ setField(reducer, "indexItemIDMap", indexItemIDMap);
+ setField(reducer, "recommendationsPerUser", 3);
+
+ reducer.reduce(new VarLongWritable(123L), values, context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * tests {@link AggregateAndRecommendReducer} with a limit on the recommendations per user
+ *
+ * @throws Exception
+ */
+ public void testAggregateAndRecommendReducerLimitNumberOfRecommendations() throws Exception {
+ Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable>.Context context =
+ EasyMock.createMock(Reducer.Context.class);
+
+ context.write(EasyMock.eq(new VarLongWritable(123L)), recommendationsMatch(new GenericRecommendedItem(1L, 2.8f)));
+
+ EasyMock.replay(context);
+
+ RandomAccessSparseVector similarityColumnOne = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ similarityColumnOne.set(1, 0.1d);
+ similarityColumnOne.set(2, 0.5d);
+
+ RandomAccessSparseVector similarityColumnTwo = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+ similarityColumnTwo.set(1, 0.9d);
+ similarityColumnTwo.set(2, 0.5d);
+
+ List<PrefAndSimilarityColumnWritable> values = Arrays.asList(
+ new PrefAndSimilarityColumnWritable(1f, similarityColumnOne),
+ new PrefAndSimilarityColumnWritable(3f, similarityColumnTwo));
+
+ OpenIntLongHashMap indexItemIDMap = new OpenIntLongHashMap();
+ indexItemIDMap.put(1, 1L);
+ indexItemIDMap.put(2, 2L);
+
+ AggregateAndRecommendReducer reducer = new AggregateAndRecommendReducer();
+
+ setField(reducer, "indexItemIDMap", indexItemIDMap);
+ setField(reducer, "recommendationsPerUser", 1);
+
+ reducer.reduce(new VarLongWritable(123L), values, context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * verifies a {@link RecommendedItemsWritable}
+ *
+ * @param items
+ * @return
+ */
+ static RecommendedItemsWritable recommendationsMatch(final RecommendedItem... items) {
+ EasyMock.reportMatcher(new IArgumentMatcher() {
+ @Override
+ public boolean matches(Object argument) {
+ if (argument instanceof RecommendedItemsWritable) {
+ RecommendedItemsWritable recommendedItemsWritable = ((RecommendedItemsWritable) argument);
+ List<RecommendedItem> expectedItems = new LinkedList<RecommendedItem>();
+ for (RecommendedItem item : items) {
+ expectedItems.add(item);
+ }
+ return expectedItems.equals(recommendedItemsWritable.getRecommendedItems());
+ }
+ return false;
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer) {}
+ });
+ return null;
+ }
+
+ /**
+ * small integration test that runs the full job
+ *
+ * As a tribute to http://www.slideshare.net/srowen/collaborative-filtering-at-scale,
+ * we recommend people food to animals in this test :)
+ *
+ * <pre>
+ *
+ * user-item-matrix
+ *
+ * burger hotdog berries icecream
+ * dog 5 5 2 -
+ * rabbit 2 - 3 5
+ * cow - 5 - 3
+ * donkey 3 - - 5
+ *
+ *
+ * item-item-similarity-matrix (tanimoto-coefficient of the item-vectors of the user-item-matrix)
+ *
+ * burger hotdog berries icecream
+ * burger - 0.25 0.66 0.5
+ * hotdog 0.25 - 0.33 0.25
+ * berries 0.66 0.33 - 0.25
+ * icecream 0.5 0.25 0.25 -
+ *
+ *
+ * Prediction(dog, icecream) = (0.5 * 5 + 0.25 * 5 + 0.25 * 2 ) / (0.5 + 0.25 + 0.25) ~ 4.3
+ * Prediction(rabbit, hotdog) = (0.25 * 2 + 0.33 * 3 + 0.25 * 5) / (0.25 + 0.33 + 0.25) ~ 3,3
+ * Prediction(cow, burger) = (0.25 * 5 + 0.5 * 3) / (0.25 + 0.5) ~ 3,7
+ * Prediction(cow, berries) = (0.33 * 5 + 0.25 * 3) / (0.33 + 0.25) ~ 4,1
+ * Prediction(donkey, hotdog) = (0.25 * 3 + 0.25 * 5) / (0.25 + 0.25) ~ 4
+ * Prediction(donkey, berries) = (0.66 * 3 + 0.25 * 5) / (0.66 + 0.25) ~ 3,6
+ *
+ * </pre>
+ *
+ *
+ * @throws Exception
+ */
+ public void testCompleteJob() throws Exception {
+
+ File inputFile = getTestTempFile("prefs.txt");
+ File outputDir = getTestTempDir("output");
+ outputDir.delete();
+ File tmpDir = getTestTempDir("tmp");
+
+ writeLines(inputFile,
+ "1,1,5",
+ "1,2,5",
+ "1,3,2",
+ "2,1,2",
+ "2,3,3",
+ "2,4,5",
+ "3,2,5",
+ "3,4,3",
+ "4,1,3",
+ "4,4,5");
+
+ RecommenderJob recommenderJob = new RecommenderJob();
+
+ Configuration conf = new Configuration();
+ conf.set("mapred.input.dir", inputFile.getAbsolutePath());
+ conf.set("mapred.output.dir", outputDir.getAbsolutePath());
+ conf.setBoolean("mapred.output.compress", false);
+
+ recommenderJob.setConf(conf);
+
+ recommenderJob.run(new String[] { "--tempDir", tmpDir.getAbsolutePath(), "--similarityClassname",
+ DistributedTanimotoCoefficientVectorSimilarity.class.getName(), "--numRecommendations", String.valueOf(1) });
+
+ Map<Long,List<RecommendedItem>> recommendations = readRecommendations(new File(outputDir, "part-r-00000"));
+
+ assertEquals(4, recommendations.size());
+
+ for (Entry<Long,List<RecommendedItem>> entry : recommendations.entrySet()) {
+ long userID = entry.getKey();
+ List<RecommendedItem> items = entry.getValue();
+ assertNotNull(items);
+ assertEquals(1, items.size());
+ RecommendedItem item = items.get(0);
+
+ if (userID == 1L) {
+ assertEquals(4L, item.getItemID());
+ assertEquals(4.3d, item.getValue(), 0.05d);
+ }
+ if (userID == 2L) {
+ assertEquals(2L, item.getItemID());
+ assertEquals(3.3d, item.getValue(), 0.05d);
+ }
+ if (userID == 3L) {
+ assertEquals(3L, item.getItemID());
+ assertEquals(4.1d, item.getValue(), 0.05d);
+ }
+ if (userID == 4L) {
+ assertEquals(2L, item.getItemID());
+ assertEquals(4d, item.getValue(), 0.05d);
+ }
+ }
+ }
+
+ static Map<Long,List<RecommendedItem>> readRecommendations(File file) throws IOException {
+ Map<Long,List<RecommendedItem>> recommendations = new HashMap<Long,List<RecommendedItem>>();
+ FileLineIterable lineIterable = new FileLineIterable(file);
+ for (String line : lineIterable) {
+
+ String[] keyValue = line.split("\t");
+ long userID = Long.parseLong(keyValue[0]);
+ String[] tokens = keyValue[1].replaceAll("\\[", "")
+ .replaceAll("\\]", "").split(",");
+
+ List<RecommendedItem> items = new LinkedList<RecommendedItem>();
+ for (String token : tokens) {
+ String[] itemTokens = token.split(":");
+ long itemID = Long.parseLong(itemTokens[0]);
+ float value = Float.parseFloat(itemTokens[1]);
+ items.add(new GenericRecommendedItem(itemID, value));
+ }
+ recommendations.put(userID, items);
+ }
+ return recommendations;
+ }
+
+}
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/common/MahoutTestCase.java Thu Jul 8 19:19:53 2010
@@ -81,10 +81,39 @@ public abstract class MahoutTestCase ext
return tempFileOrDir;
}
- protected static void setField(Object target, String fieldname, Object value)
+ /**
+ * try to directly set a (possibly private) field on an Object
+ *
+ * @param target
+ * @param fieldname
+ * @param value
+ * @throws NoSuchFieldException
+ * @throws IllegalAccessException
+ */
+ protected void setField(Object target, String fieldname, Object value)
throws NoSuchFieldException, IllegalAccessException {
- Field field = target.getClass().getDeclaredField(fieldname);
+ Field field = findDeclaredField(target.getClass(), fieldname);
field.setAccessible(true);
field.set(target, value);
}
+
+ /**
+ * find a declared field in a class or one of it's super classes
+ *
+ * @param inClass
+ * @param fieldname
+ * @return
+ * @throws NoSuchFieldException
+ */
+ private Field findDeclaredField(Class<?> inClass, String fieldname) throws NoSuchFieldException {
+ if (Object.class.equals(inClass)) {
+ throw new NoSuchFieldException();
+ }
+ for (Field field : inClass.getDeclaredFields()) {
+ if (field.getName().equalsIgnoreCase(fieldname)) {
+ return field;
+ }
+ }
+ return findDeclaredField(inClass.getSuperclass(), fieldname);
+ }
}
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/MathHelper.java Thu Jul 8 19:19:53 2010
@@ -136,13 +136,7 @@ public class MathHelper {
public boolean matches(Object argument) {
if (argument instanceof VectorWritable) {
Vector v = ((VectorWritable) argument).get();
- for (Element element : elements) {
- boolean matches = Math.abs(element.get() - v.get(element.index())) <= EPSILON;
- if (!matches) {
- return false;
- }
- }
- return true;
+ return consistsOf(v, elements);
}
return false;
}
@@ -154,6 +148,44 @@ public class MathHelper {
}
/**
+ * checks whether the {@link Vector} is equivalent to the set of {@link Vector.Element}s
+ *
+ * @param vector
+ * @param elements
+ * @return
+ */
+ public static boolean consistsOf(Vector vector, Vector.Element... elements) {
+ if (elements.length != numberOfNoNZeroNonNaNElements(vector)) {
+ return false;
+ }
+ for (Vector.Element element : elements) {
+ boolean matches = Math.abs(element.get() - vector.get(element.index())) <= EPSILON;
+ if (!matches) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * returns the number of elements in the {@link Vector} that are neither 0 nor NaN
+ *
+ * @param vector
+ * @return
+ */
+ public static int numberOfNoNZeroNonNaNElements(Vector vector) {
+ int elementsInVector = 0;
+ Iterator<Element> vectorIterator = vector.iterateNonZero();
+ while (vectorIterator.hasNext()) {
+ Element currentElement = vectorIterator.next();
+ if (!Double.isNaN(currentElement.get())) {
+ elementsInVector++;
+ }
+ }
+ return elementsInVector;
+ }
+
+ /**
* read a {@link Matrix} from a SequenceFile<IntWritable,VectorWritable>
* @param fs
* @param conf
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java?rev=961888&r1=961887&r2=961888&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/TestRowSimilarityJob.java Thu Jul 8 19:19:53 2010
@@ -34,8 +34,8 @@ import org.apache.mahout.math.RandomAcce
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.MathHelper;
import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
+import org.apache.mahout.math.hadoop.MathHelper;
import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob.EntriesToVectorsReducer;
import org.apache.mahout.math.hadoop.similarity.RowSimilarityJob.SimilarityReducer;
import org.apache.mahout.math.hadoop.similarity.vector.DistributedTanimotoCoefficientVectorSimilarity;
@@ -275,7 +275,7 @@ public class TestRowSimilarityJob extend
Matrix similarityMatrix =
MathHelper.readEntries(fs, conf, new Path(outputDir.getAbsolutePath(), "part-r-00000"), 3, 3);
-
+
assertNotNull(similarityMatrix);
assertEquals(3, similarityMatrix.numCols());
assertEquals(3, similarityMatrix.numRows());
@@ -367,7 +367,7 @@ public class TestRowSimilarityJob extend
Matrix similarityMatrix =
MathHelper.readEntries(fs, conf, new Path(outputDir.getAbsolutePath(), "part-r-00000"), 3, 3);
-
+
assertNotNull(similarityMatrix);
assertEquals(3, similarityMatrix.numCols());
assertEquals(3, similarityMatrix.numRows());