You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2010/09/08 07:43:11 UTC

svn commit: r993607 - in /mahout/trunk/core/src: main/java/org/apache/mahout/cf/taste/hadoop/item/ test/java/org/apache/mahout/cf/taste/hadoop/item/

Author: ssc
Date: Wed Sep  8 05:43:10 2010
New Revision: 993607

URL: http://svn.apache.org/viewvc?rev=993607&view=rev
Log:
MAHOUT-493 Explicit filter for user/item pairs in o.a.m.cf.taste.hadoop.item.RecommenderJob

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java?rev=993607&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java Wed Sep  8 05:43:10 2010
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * we use a neat little trick to explicitly filter items for some users: we inject a NaN summand into the preference
+ * estimation for those items, which makes {@link org.apache.mahout.cf.taste.hadoop.item.AggregateAndRecommendReducer}
+ * automatically exclude them 
+ */
+public class ItemFilterAsVectorAndPrefsReducer
+    extends Reducer<VarLongWritable,VarLongWritable,VarIntWritable,VectorAndPrefsWritable> {
+  @Override
+  protected void reduce(VarLongWritable itemID, Iterable<VarLongWritable> values, Context ctx)
+      throws IOException, InterruptedException {
+    int itemIDIndex = TasteHadoopUtils.idToIndex(itemID.get());
+    Vector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
+    /* artificial NaN summand to exclude this item from the recommendations for all users specified in userIDs */
+    vector.set(itemIDIndex, Double.NaN);
+
+    List<Long> userIDs = new ArrayList<Long>();
+    List<Float> prefValues = new ArrayList<Float>();
+    for (VarLongWritable userID : values) {
+      userIDs.add(userID.get());
+      prefValues.add(1f);
+    }
+
+    ctx.write(new VarIntWritable(itemIDIndex), new VectorAndPrefsWritable(vector, userIDs, prefValues));
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java?rev=993607&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java Wed Sep  8 05:43:10 2010
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarLongWritable;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/**
+ * map out all user/item pairs to filter, keyed by the itemID
+ */
+public class ItemFilterMapper extends Mapper<LongWritable,Text,VarLongWritable,VarLongWritable> {
+
+  private static final Pattern SEPARATOR = Pattern.compile("[\t,]");
+
+  @Override
+  protected void map(LongWritable key, Text line, Context ctx) throws IOException, InterruptedException {
+    String[] tokens = SEPARATOR.split(line.toString());
+    long userID = Long.parseLong(tokens[0]);
+    long itemID = Long.parseLong(tokens[1]);
+    ctx.write(new VarLongWritable(itemID), new VarLongWritable(userID));
+  }
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=993607&r1=993606&r2=993607&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Wed Sep  8 05:43:10 2010
@@ -66,6 +66,8 @@ import java.util.regex.Pattern;
  * <li>--similarityClassname (classname): Name of distributed similarity class to instantiate</li>
  * <li>--usersFile (path): file containing user IDs to recommend for (optional)</li>
  * <li>--itemsFile (path): file containing item IDs to recommend for (optional)</li>
+ * <li>--filterFile (path): file containing comma-separated userID,itemID pairs. Used to exclude the item from the
+ * recommendations for that user(optional)</li>
  * <li>--numRecommendations (integer): Number of recommendations to compute per user (optional; default 10)</li>
  * <li>--booleanData (boolean): Treat input data as having to pref values (false)</li>
  * <li>--maxPrefsPerUser(integer): Maximum number of preferences considered per user in
@@ -97,6 +99,8 @@ public final class RecommenderJob extend
         String.valueOf(AggregateAndRecommendReducer.DEFAULT_NUM_RECOMMENDATIONS));
     addOption("usersFile", "u", "File of users to recommend for", null);
     addOption("itemsFile", "i", "File of items to recommend for", null);
+    addOption("filterFile", "f", "File containing comma-separated userID,itemID pairs. Used to exclude the item from " +
+        "the recommendations for that user(optional)", null);
     addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString());
     addOption("maxPrefsPerUser", null,
         "Maximum number of preferences considered per user in final recommendation phase",
@@ -121,6 +125,7 @@ public final class RecommenderJob extend
     int numRecommendations = Integer.parseInt(parsedArgs.get("--numRecommendations"));
     String usersFile = parsedArgs.get("--usersFile");
     String itemsFile = parsedArgs.get("--itemsFile");
+    String filterFile = parsedArgs.get("--filterFile");
     boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
     int maxPrefsPerUser = Integer.parseInt(parsedArgs.get("--maxPrefsPerUser"));
     int maxSimilaritiesPerItem = Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItem"));
@@ -134,6 +139,7 @@ public final class RecommenderJob extend
     Path similarityMatrixPath = new Path(tempDirPath, "similarityMatrix");
     Path prePartialMultiplyPath1 = new Path(tempDirPath, "prePartialMultiply1");
     Path prePartialMultiplyPath2 = new Path(tempDirPath, "prePartialMultiply2");
+    Path explicitFilterPath = new Path(tempDirPath, "explicitFilterPath");
     Path partialMultiplyPath = new Path(tempDirPath, "partialMultiply");
 
     AtomicInteger currentPhase = new AtomicInteger();
@@ -229,11 +235,10 @@ public final class RecommenderJob extend
       prePartialMultiply2.waitForCompletion(true);
 
       Job partialMultiply = prepareJob(
-        new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,
-        SequenceFileInputFormat.class,
-        Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
-        ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
-        SequenceFileOutputFormat.class);
+          new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,
+          SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
+          ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
+          SequenceFileOutputFormat.class);
 
       /* necessary to make this job (having a combined input path) work on Amazon S3 */
       Configuration partialMultiplyConf = partialMultiply.getConfiguration();
@@ -245,19 +250,42 @@ public final class RecommenderJob extend
     }
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+
+      /* convert the user/item pairs to filter if a filterfile has been specified */
+      if (filterFile != null) {
+        Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class,
+            ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,
+            ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
+            SequenceFileOutputFormat.class);
+          itemFiltering.waitForCompletion(true);
+      }
+
+      String aggregateAndRecommendInput = partialMultiplyPath.toString();
+      if (filterFile != null) {
+        aggregateAndRecommendInput += "," + explicitFilterPath;
+      }
+
       Job aggregateAndRecommend = prepareJob(
-          partialMultiplyPath, outputPath, SequenceFileInputFormat.class,
+          new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,
           PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
           AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
           TextOutputFormat.class);
-      Configuration jobConf = aggregateAndRecommend.getConfiguration();
+      Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();
       if (itemsFile != null) {
-    	  jobConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
+    	  aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
+      }
+
+      if (filterFile != null) {
+        /* necessary to make this job (having a combined input path) work on Amazon S3 */
+        FileSystem fs = FileSystem.get(tempDirPath.toUri(), aggregateAndRecommendConf);
+        partialMultiplyPath = partialMultiplyPath.makeQualified(fs);
+        explicitFilterPath = explicitFilterPath.makeQualified(fs);
+        SequenceFileInputFormat.setInputPaths(aggregateAndRecommend, partialMultiplyPath, explicitFilterPath);
       }
       setIOSort(aggregateAndRecommend);
-      jobConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath.toString());
-      jobConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
-      jobConf.setBoolean(BOOLEAN_DATA, booleanData);
+      aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath.toString());
+      aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
+      aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);
       aggregateAndRecommend.waitForCompletion(true);
     }
 

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java?rev=993607&r1=993606&r2=993607&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java Wed Sep  8 05:43:10 2010
@@ -417,6 +417,78 @@ public class RecommenderJobTest extends 
   }
 
   /**
+   * tests {@link org.apache.mahout.cf.taste.hadoop.item.ItemFilterMapper}
+   */
+  @Test
+  public void testItemFilterMapper() throws Exception {
+
+    Mapper<LongWritable,Text,VarLongWritable,VarLongWritable>.Context context =
+      EasyMock.createMock(Mapper.Context.class);
+
+    context.write(new VarLongWritable(34L), new VarLongWritable(12L));
+    context.write(new VarLongWritable(78L), new VarLongWritable(56L));
+
+    EasyMock.replay(context);
+
+    ItemFilterMapper mapper = new ItemFilterMapper();
+    mapper.map(null, new Text("12,34"), context);
+    mapper.map(null, new Text("56,78"), context);
+
+    EasyMock.verify(context);
+  }
+
+  /**
+   * tests {@link org.apache.mahout.cf.taste.hadoop.item.ItemFilterAsVectorAndPrefsReducer}
+   */
+  @Test
+  public void testItemFilterAsVectorAndPrefsReducer() throws Exception {
+    Reducer<VarLongWritable,VarLongWritable,VarIntWritable,VectorAndPrefsWritable>.Context context =
+        EasyMock.createMock(Reducer.Context.class);
+
+    int itemIDIndex = TasteHadoopUtils.idToIndex(123L);
+    context.write(EasyMock.eq(new VarIntWritable(itemIDIndex)), vectorAndPrefsForFilteringMatches(123L, 456L, 789L));
+
+    EasyMock.replay(context);
+
+    new ItemFilterAsVectorAndPrefsReducer().reduce(new VarLongWritable(123L), Arrays.asList(new VarLongWritable(456L),
+        new VarLongWritable(789L)), context);
+
+    EasyMock.verify(context);
+  }
+
+  static VectorAndPrefsWritable vectorAndPrefsForFilteringMatches(final long itemID, final long... userIDs) {
+    EasyMock.reportMatcher(new IArgumentMatcher() {
+      @Override
+      public boolean matches(Object argument) {
+        if (argument instanceof VectorAndPrefsWritable) {
+          VectorAndPrefsWritable vectorAndPrefs = (VectorAndPrefsWritable) argument;
+          Vector vector = vectorAndPrefs.getVector();
+          if (vector.getNumNondefaultElements() != 1) {
+            return false;
+          }
+          if (!Double.isNaN(vector.get(TasteHadoopUtils.idToIndex(itemID)))) {
+            return false;
+          }
+          if (userIDs.length != vectorAndPrefs.getUserIDs().size()) {
+            return false;
+          }
+          for (long userID : userIDs) {
+            if (!vectorAndPrefs.getUserIDs().contains(userID)) {
+              return false;
+            }
+          }
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public void appendTo(StringBuffer buffer) {}
+    });
+    return null;
+  }
+
+  /**
    * tests {@link PartialMultiplyMapper}
    */
   @Test
@@ -729,6 +801,62 @@ public class RecommenderJobTest extends 
     assertTrue((itemID1 == 1L && itemID2 == 3L) || (itemID1 == 3L && itemID2 == 1L));
   }
 
+  /**
+   * check whether the explicit user/item filter works
+   */
+  @Test
+   public void testCompleteJobWithFiltering() throws Exception {
+
+     File inputFile = getTestTempFile("prefs.txt");
+     File userFile = getTestTempFile("users.txt");
+     File filterFile = getTestTempFile("filter.txt");
+     File outputDir = getTestTempDir("output");
+     outputDir.delete();
+     File tmpDir = getTestTempDir("tmp");
+
+     writeLines(inputFile,
+         "1,1,5",
+         "1,2,5",
+         "1,3,2",
+         "2,1,2",
+         "2,3,3",
+         "2,4,5",
+         "3,2,5",
+         "3,4,3",
+         "4,1,3",
+         "4,4,5");
+
+     /* only compute recommendations for the donkey */
+     writeLines(userFile, "4");
+     /* do not recommend the hotdog for the donkey */
+     writeLines(filterFile, "4,2");
+
+     RecommenderJob recommenderJob = new RecommenderJob();
+
+     Configuration conf = new Configuration();
+     conf.set("mapred.input.dir", inputFile.getAbsolutePath());
+     conf.set("mapred.output.dir", outputDir.getAbsolutePath());
+     conf.setBoolean("mapred.output.compress", false);
+
+     recommenderJob.setConf(conf);
+
+     recommenderJob.run(new String[] { "--tempDir", tmpDir.getAbsolutePath(), "--similarityClassname",
+        DistributedTanimotoCoefficientVectorSimilarity.class.getName(), "--numRecommendations", "1",
+        "--usersFile", userFile.getAbsolutePath(), "--filterFile", filterFile.getAbsolutePath() });
+
+     Map<Long,List<RecommendedItem>> recommendations = readRecommendations(new File(outputDir, "part-r-00000"));
+
+     assertEquals(1, recommendations.size());
+     assertTrue(recommendations.containsKey(4L));
+     assertEquals(1, recommendations.get(4L).size());
+
+     /* berries should have been recommended to the donkey */
+     RecommendedItem recommendedItem = recommendations.get(4L).get(0);
+     assertEquals(3L, recommendedItem.getItemID());
+     assertEquals(3.5, recommendedItem.getValue(), 0.05);
+   }
+
+
   static Map<Long,List<RecommendedItem>> readRecommendations(File file) throws IOException {
     Map<Long,List<RecommendedItem>> recommendations = new HashMap<Long,List<RecommendedItem>>();
     Iterable<String> lineIterable = new FileLineIterable(file);