You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2010/09/08 07:43:11 UTC
svn commit: r993607 - in /mahout/trunk/core/src:
main/java/org/apache/mahout/cf/taste/hadoop/item/
test/java/org/apache/mahout/cf/taste/hadoop/item/
Author: ssc
Date: Wed Sep 8 05:43:10 2010
New Revision: 993607
URL: http://svn.apache.org/viewvc?rev=993607&view=rev
Log:
MAHOUT-493 Explicit filter for user/item pairs in o.a.m.cf.taste.hadoop.item.RecommenderJob
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java
Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java?rev=993607&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java Wed Sep 8 05:43:10 2010
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * we use a neat little trick to explicitly filter items for some users: we inject a NaN summand into the preference
+ * estimation for those items, which makes {@link org.apache.mahout.cf.taste.hadoop.item.AggregateAndRecommendReducer}
+ * automatically exclude them
+ */
+public class ItemFilterAsVectorAndPrefsReducer
+ extends Reducer<VarLongWritable,VarLongWritable,VarIntWritable,VectorAndPrefsWritable> {
+ @Override
+ protected void reduce(VarLongWritable itemID, Iterable<VarLongWritable> values, Context ctx)
+ throws IOException, InterruptedException {
+ int itemIDIndex = TasteHadoopUtils.idToIndex(itemID.get());
+ Vector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
+ /* artificial NaN summand to exclude this item from the recommendations for all users specified in userIDs */
+ vector.set(itemIDIndex, Double.NaN);
+
+ List<Long> userIDs = new ArrayList<Long>();
+ List<Float> prefValues = new ArrayList<Float>();
+ for (VarLongWritable userID : values) {
+ userIDs.add(userID.get());
+ prefValues.add(1f);
+ }
+
+ ctx.write(new VarIntWritable(itemIDIndex), new VectorAndPrefsWritable(vector, userIDs, prefValues));
+ }
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java?rev=993607&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java Wed Sep 8 05:43:10 2010
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarLongWritable;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/**
+ * map out all user/item pairs to filter, keyed by the itemID
+ */
+public class ItemFilterMapper extends Mapper<LongWritable,Text,VarLongWritable,VarLongWritable> {
+
+ private static final Pattern SEPARATOR = Pattern.compile("[\t,]");
+
+ @Override
+ protected void map(LongWritable key, Text line, Context ctx) throws IOException, InterruptedException {
+ String[] tokens = SEPARATOR.split(line.toString());
+ long userID = Long.parseLong(tokens[0]);
+ long itemID = Long.parseLong(tokens[1]);
+ ctx.write(new VarLongWritable(itemID), new VarLongWritable(userID));
+ }
+}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=993607&r1=993606&r2=993607&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Wed Sep 8 05:43:10 2010
@@ -66,6 +66,8 @@ import java.util.regex.Pattern;
* <li>--similarityClassname (classname): Name of distributed similarity class to instantiate</li>
* <li>--usersFile (path): file containing user IDs to recommend for (optional)</li>
* <li>--itemsFile (path): file containing item IDs to recommend for (optional)</li>
+ * <li>--filterFile (path): file containing comma-separated userID,itemID pairs. Used to exclude the item from the
+ * recommendations for that user(optional)</li>
* <li>--numRecommendations (integer): Number of recommendations to compute per user (optional; default 10)</li>
* <li>--booleanData (boolean): Treat input data as having to pref values (false)</li>
* <li>--maxPrefsPerUser(integer): Maximum number of preferences considered per user in
@@ -97,6 +99,8 @@ public final class RecommenderJob extend
String.valueOf(AggregateAndRecommendReducer.DEFAULT_NUM_RECOMMENDATIONS));
addOption("usersFile", "u", "File of users to recommend for", null);
addOption("itemsFile", "i", "File of items to recommend for", null);
+ addOption("filterFile", "f", "File containing comma-separated userID,itemID pairs. Used to exclude the item from " +
+ "the recommendations for that user(optional)", null);
addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString());
addOption("maxPrefsPerUser", null,
"Maximum number of preferences considered per user in final recommendation phase",
@@ -121,6 +125,7 @@ public final class RecommenderJob extend
int numRecommendations = Integer.parseInt(parsedArgs.get("--numRecommendations"));
String usersFile = parsedArgs.get("--usersFile");
String itemsFile = parsedArgs.get("--itemsFile");
+ String filterFile = parsedArgs.get("--filterFile");
boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
int maxPrefsPerUser = Integer.parseInt(parsedArgs.get("--maxPrefsPerUser"));
int maxSimilaritiesPerItem = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItem"));
@@ -134,6 +139,7 @@ public final class RecommenderJob extend
Path similarityMatrixPath = new Path(tempDirPath, "similarityMatrix");
Path prePartialMultiplyPath1 = new Path(tempDirPath, "prePartialMultiply1");
Path prePartialMultiplyPath2 = new Path(tempDirPath, "prePartialMultiply2");
+ Path explicitFilterPath = new Path(tempDirPath, "explicitFilterPath");
Path partialMultiplyPath = new Path(tempDirPath, "partialMultiply");
AtomicInteger currentPhase = new AtomicInteger();
@@ -229,11 +235,10 @@ public final class RecommenderJob extend
prePartialMultiply2.waitForCompletion(true);
Job partialMultiply = prepareJob(
- new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,
- SequenceFileInputFormat.class,
- Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
- ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
- SequenceFileOutputFormat.class);
+ new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,
+ SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
+ ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
+ SequenceFileOutputFormat.class);
/* necessary to make this job (having a combined input path) work on Amazon S3 */
Configuration partialMultiplyConf = partialMultiply.getConfiguration();
@@ -245,19 +250,42 @@ public final class RecommenderJob extend
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+
+ /* convert the user/item pairs to filter if a filterfile has been specified */
+ if (filterFile != null) {
+ Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class,
+ ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,
+ ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
+ SequenceFileOutputFormat.class);
+ itemFiltering.waitForCompletion(true);
+ }
+
+ String aggregateAndRecommendInput = partialMultiplyPath.toString();
+ if (filterFile != null) {
+ aggregateAndRecommendInput += "," + explicitFilterPath;
+ }
+
Job aggregateAndRecommend = prepareJob(
- partialMultiplyPath, outputPath, SequenceFileInputFormat.class,
+ new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,
PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
TextOutputFormat.class);
- Configuration jobConf = aggregateAndRecommend.getConfiguration();
+ Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();
if (itemsFile != null) {
- jobConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
+ aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
+ }
+
+ if (filterFile != null) {
+ /* necessary to make this job (having a combined input path) work on Amazon S3 */
+ FileSystem fs = FileSystem.get(tempDirPath.toUri(), aggregateAndRecommendConf);
+ partialMultiplyPath = partialMultiplyPath.makeQualified(fs);
+ explicitFilterPath = explicitFilterPath.makeQualified(fs);
+ SequenceFileInputFormat.setInputPaths(aggregateAndRecommend, partialMultiplyPath, explicitFilterPath);
}
setIOSort(aggregateAndRecommend);
- jobConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath.toString());
- jobConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
- jobConf.setBoolean(BOOLEAN_DATA, booleanData);
+ aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath.toString());
+ aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
+ aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);
aggregateAndRecommend.waitForCompletion(true);
}
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java?rev=993607&r1=993606&r2=993607&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java Wed Sep 8 05:43:10 2010
@@ -417,6 +417,78 @@ public class RecommenderJobTest extends
}
/**
+ * tests {@link org.apache.mahout.cf.taste.hadoop.item.ItemFilterMapper}
+ */
+ @Test
+ public void testItemFilterMapper() throws Exception {
+
+ Mapper<LongWritable,Text,VarLongWritable,VarLongWritable>.Context context =
+ EasyMock.createMock(Mapper.Context.class);
+
+ context.write(new VarLongWritable(34L), new VarLongWritable(12L));
+ context.write(new VarLongWritable(78L), new VarLongWritable(56L));
+
+ EasyMock.replay(context);
+
+ ItemFilterMapper mapper = new ItemFilterMapper();
+ mapper.map(null, new Text("12,34"), context);
+ mapper.map(null, new Text("56,78"), context);
+
+ EasyMock.verify(context);
+ }
+
+ /**
+ * tests {@link org.apache.mahout.cf.taste.hadoop.item.ItemFilterAsVectorAndPrefsReducer}
+ */
+ @Test
+ public void testItemFilterAsVectorAndPrefsReducer() throws Exception {
+ Reducer<VarLongWritable,VarLongWritable,VarIntWritable,VectorAndPrefsWritable>.Context context =
+ EasyMock.createMock(Reducer.Context.class);
+
+ int itemIDIndex = TasteHadoopUtils.idToIndex(123L);
+ context.write(EasyMock.eq(new VarIntWritable(itemIDIndex)), vectorAndPrefsForFilteringMatches(123L, 456L, 789L));
+
+ EasyMock.replay(context);
+
+ new ItemFilterAsVectorAndPrefsReducer().reduce(new VarLongWritable(123L), Arrays.asList(new VarLongWritable(456L),
+ new VarLongWritable(789L)), context);
+
+ EasyMock.verify(context);
+ }
+
+ static VectorAndPrefsWritable vectorAndPrefsForFilteringMatches(final long itemID, final long... userIDs) {
+ EasyMock.reportMatcher(new IArgumentMatcher() {
+ @Override
+ public boolean matches(Object argument) {
+ if (argument instanceof VectorAndPrefsWritable) {
+ VectorAndPrefsWritable vectorAndPrefs = (VectorAndPrefsWritable) argument;
+ Vector vector = vectorAndPrefs.getVector();
+ if (vector.getNumNondefaultElements() != 1) {
+ return false;
+ }
+ if (!Double.isNaN(vector.get(TasteHadoopUtils.idToIndex(itemID)))) {
+ return false;
+ }
+ if (userIDs.length != vectorAndPrefs.getUserIDs().size()) {
+ return false;
+ }
+ for (long userID : userIDs) {
+ if (!vectorAndPrefs.getUserIDs().contains(userID)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer) {}
+ });
+ return null;
+ }
+
+ /**
* tests {@link PartialMultiplyMapper}
*/
@Test
@@ -729,6 +801,62 @@ public class RecommenderJobTest extends
assertTrue((itemID1 == 1L && itemID2 == 3L) || (itemID1 == 3L && itemID2 == 1L));
}
+ /**
+ * check whether the explicit user/item filter works
+ */
+ @Test
+ public void testCompleteJobWithFiltering() throws Exception {
+
+ File inputFile = getTestTempFile("prefs.txt");
+ File userFile = getTestTempFile("users.txt");
+ File filterFile = getTestTempFile("filter.txt");
+ File outputDir = getTestTempDir("output");
+ outputDir.delete();
+ File tmpDir = getTestTempDir("tmp");
+
+ writeLines(inputFile,
+ "1,1,5",
+ "1,2,5",
+ "1,3,2",
+ "2,1,2",
+ "2,3,3",
+ "2,4,5",
+ "3,2,5",
+ "3,4,3",
+ "4,1,3",
+ "4,4,5");
+
+ /* only compute recommendations for the donkey */
+ writeLines(userFile, "4");
+ /* do not recommend the hotdog for the donkey */
+ writeLines(filterFile, "4,2");
+
+ RecommenderJob recommenderJob = new RecommenderJob();
+
+ Configuration conf = new Configuration();
+ conf.set("mapred.input.dir", inputFile.getAbsolutePath());
+ conf.set("mapred.output.dir", outputDir.getAbsolutePath());
+ conf.setBoolean("mapred.output.compress", false);
+
+ recommenderJob.setConf(conf);
+
+ recommenderJob.run(new String[] { "--tempDir", tmpDir.getAbsolutePath(), "--similarityClassname",
+ DistributedTanimotoCoefficientVectorSimilarity.class.getName(), "--numRecommendations", "1",
+ "--usersFile", userFile.getAbsolutePath(), "--filterFile", filterFile.getAbsolutePath() });
+
+ Map<Long,List<RecommendedItem>> recommendations = readRecommendations(new File(outputDir, "part-r-00000"));
+
+ assertEquals(1, recommendations.size());
+ assertTrue(recommendations.containsKey(4L));
+ assertEquals(1, recommendations.get(4L).size());
+
+ /* berries should have been recommended to the donkey */
+ RecommendedItem recommendedItem = recommendations.get(4L).get(0);
+ assertEquals(3L, recommendedItem.getItemID());
+ assertEquals(3.5, recommendedItem.getValue(), 0.05);
+ }
+
+
static Map<Long,List<RecommendedItem>> readRecommendations(File file) throws IOException {
Map<Long,List<RecommendedItem>> recommendations = new HashMap<Long,List<RecommendedItem>>();
Iterable<String> lineIterable = new FileLineIterable(file);