You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/05/26 20:59:03 UTC

svn commit: r948538 [1/2] - in /mahout/trunk: core/src/main/java/org/apache/mahout/cf/taste/hadoop/ core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/ core/src/main/java/org/apache/ma...

Author: srowen
Date: Wed May 26 18:59:02 2010
New Revision: 948538

URL: http://svn.apache.org/viewvc?rev=948538&view=rev
Log:
Recommender and related jobs now exclusively use new Hadoop 0.20.x+ APIs

Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/AbstractDistributedItemSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedEuclideanDistanceSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedLogLikelihoodSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedTanimotoCoefficientSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredCosineSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneDiffsToAveragesReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOnePrefsToDiffsReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarityTestCase.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java Wed May 26 18:59:02 2010
@@ -17,20 +17,17 @@
 
 package org.apache.mahout.cf.taste.hadoop;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
 import org.apache.mahout.math.VarLongWritable;
 
 import java.io.IOException;
 import java.util.regex.Pattern;
 
-public abstract class ToEntityPrefsMapper extends MapReduceBase implements
+public abstract class ToEntityPrefsMapper extends
     Mapper<LongWritable,Text, VarLongWritable,VarLongWritable> {
 
   public static final String TRANSPOSE_USER_ITEM = "transposeUserItem";
@@ -46,7 +43,8 @@ public abstract class ToEntityPrefsMappe
   }
 
   @Override
-  public void configure(JobConf jobConf) {
+  public void setup(Context context) {
+    Configuration jobConf = context.getConfiguration();
     booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
     transpose = jobConf.getBoolean(TRANSPOSE_USER_ITEM, false);
   }
@@ -54,8 +52,7 @@ public abstract class ToEntityPrefsMappe
   @Override
   public void map(LongWritable key,
                   Text value,
-                  OutputCollector<VarLongWritable,VarLongWritable> output,
-                  Reporter reporter) throws IOException {
+                  Context context) throws IOException, InterruptedException {
     String[] tokens = ToEntityPrefsMapper.DELIMITER.split(value.toString());
     long userID = Long.parseLong(tokens[0]);
     long itemID = Long.parseLong(tokens[1]);
@@ -68,10 +65,10 @@ public abstract class ToEntityPrefsMappe
       itemID = temp;
     }
     if (booleanData) {
-      output.collect(new VarLongWritable(userID), new VarLongWritable(itemID));
+      context.write(new VarLongWritable(userID), new VarLongWritable(itemID));
     } else {
       float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) : 1.0f;
-      output.collect(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
+      context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
     }
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java Wed May 26 18:59:02 2010
@@ -23,7 +23,8 @@ import org.apache.hadoop.io.Text;
  * <h1>Input</h1>
  * 
  * <p>
- * Intended for use with {@link org.apache.hadoop.mapred.TextInputFormat}; accepts line number / line pairs as
+ * Intended for use with {@link org.apache.hadoop.mapreduce.lib.input.TextInputFormat};
+ * accepts line number / line pairs as
  * {@link org.apache.mahout.math.VarLongWritable}/{@link Text} pairs.
  * </p>
  * 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Wed May 26 18:59:02 2010
@@ -25,16 +25,13 @@ import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
 import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
 import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
@@ -45,7 +42,7 @@ import org.apache.mahout.math.VectorWrit
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.map.OpenIntLongHashMap;
 
-public final class AggregateAndRecommendReducer extends MapReduceBase implements
+public final class AggregateAndRecommendReducer extends
     Reducer<VarLongWritable,VectorWritable,VarLongWritable,RecommendedItemsWritable> {
 
   static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
@@ -62,7 +59,8 @@ public final class AggregateAndRecommend
   private OpenIntLongHashMap indexItemIDMap;
 
   @Override
-  public void configure(JobConf jobConf) {
+  public void setup(Context context) {
+    Configuration jobConf = context.getConfiguration();
     recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
     try {
       FileSystem fs = FileSystem.get(jobConf);
@@ -86,15 +84,17 @@ public final class AggregateAndRecommend
 
   @Override
   public void reduce(VarLongWritable key,
-                     Iterator<VectorWritable> values,
-                     OutputCollector<VarLongWritable,RecommendedItemsWritable> output,
-                     Reporter reporter) throws IOException {
-    if (!values.hasNext()) {
-      return;
+                     Iterable<VectorWritable> values,
+                     Context context) throws IOException, InterruptedException {
+
+    Vector recommendationVector = null;
+    for (VectorWritable vectorWritable : values) {
+      recommendationVector = recommendationVector == null ?
+          vectorWritable.get() :
+          recommendationVector.plus(vectorWritable.get());
     }
-    Vector recommendationVector = values.next().get();
-    while (values.hasNext()) {
-      recommendationVector = recommendationVector.plus(values.next().get());
+    if (recommendationVector == null) {
+      return;
     }
 
     Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1,
@@ -120,7 +120,7 @@ public final class AggregateAndRecommend
       List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>(topItems.size());
       recommendations.addAll(topItems);
       Collections.sort(recommendations, ByValueRecommendedItemComparator.getInstance());
-      output.collect(key, new RecommendedItemsWritable(recommendations));
+      context.write(key, new RecommendedItemsWritable(recommendations));
     }
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java Wed May 26 18:59:02 2010
@@ -18,32 +18,28 @@
 package org.apache.mahout.cf.taste.hadoop.item;
 
 import java.io.IOException;
-import java.util.Iterator;
 
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
-public final class AggregateCombiner extends MapReduceBase implements
+public final class AggregateCombiner extends
     Reducer<VarLongWritable,VectorWritable,VarLongWritable,VectorWritable> {
 
   @Override
   public void reduce(VarLongWritable key,
-                     Iterator<VectorWritable> values,
-                     OutputCollector<VarLongWritable,VectorWritable> output,
-                     Reporter reporter) throws IOException {
-    if (values.hasNext()) {
-      Vector partial = values.next().get();
-      while (values.hasNext()) {
-        partial = partial.plus(values.next().get());
-      }
+                     Iterable<VectorWritable> values,
+                     Context context) throws IOException, InterruptedException {
+
+    Vector partial = null;
+    for (VectorWritable vectorWritable : values) {
+      partial = partial == null ? vectorWritable.get() : partial.plus(vectorWritable.get());
+    }
+    if (partial != null) {
       VectorWritable vw = new VectorWritable(partial);
       vw.setWritesLaxPrecision(true);
-      output.collect(key, vw);
+      context.write(key, vw);
     }
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java Wed May 26 18:59:02 2010
@@ -19,22 +19,18 @@ package org.apache.mahout.cf.taste.hadoo
 
 import java.io.IOException;
 
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VectorWritable;
 
-public final class CooccurrenceColumnWrapperMapper extends MapReduceBase implements
-    Mapper<VarIntWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
+public final class CooccurrenceColumnWrapperMapper extends
+    Mapper<VarIntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> {
 
   @Override
   public void map(VarIntWritable key,
                   VectorWritable value,
-                  OutputCollector<VarIntWritable,VectorOrPrefWritable> output,
-                  Reporter reporter) throws IOException {
-    output.collect(key, new VectorOrPrefWritable(value.get()));
+                  Context context) throws IOException, InterruptedException {
+    context.write(key, new VectorOrPrefWritable(value.get()));
   }
 
 }
\ No newline at end of file

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java Wed May 26 18:59:02 2010
@@ -20,18 +20,15 @@ package org.apache.mahout.cf.taste.hadoo
 import java.io.IOException;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
 
-public final class ItemIDIndexMapper extends MapReduceBase implements
+public final class ItemIDIndexMapper extends
     Mapper<LongWritable,Text, VarIntWritable, VarLongWritable> {
   
   private static final Pattern COMMA = Pattern.compile(",");
@@ -39,19 +36,19 @@ public final class ItemIDIndexMapper ext
   private boolean transpose;
 
   @Override
-  public void configure(JobConf jobConf) {
+  public void setup(Context context) {
+    Configuration jobConf = context.getConfiguration();
     transpose = jobConf.getBoolean(ToEntityPrefsMapper.TRANSPOSE_USER_ITEM, false);
   }
   
   @Override
   public void map(LongWritable key,
                   Text value,
-                  OutputCollector<VarIntWritable,VarLongWritable> output,
-                  Reporter reporter) throws IOException {
+                  Context context) throws IOException, InterruptedException {
     String[] tokens = ItemIDIndexMapper.COMMA.split(value.toString());
     long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);
     int index = idToIndex(itemID);
-    output.collect(new VarIntWritable(index), new VarLongWritable(itemID));
+    context.write(new VarIntWritable(index), new VarLongWritable(itemID));
   }
   
   static int idToIndex(long itemID) {

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java Wed May 26 18:59:02 2010
@@ -18,32 +18,27 @@
 package org.apache.mahout.cf.taste.hadoop.item;
 
 import java.io.IOException;
-import java.util.Iterator;
 
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
 
-public final class ItemIDIndexReducer extends MapReduceBase implements
+public final class ItemIDIndexReducer extends
     Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> {
   
   @Override
   public void reduce(VarIntWritable index,
-                     Iterator<VarLongWritable> possibleItemIDs,
-                     OutputCollector<VarIntWritable,VarLongWritable> output,
-                     Reporter reporter) throws IOException {
-    if (possibleItemIDs.hasNext()) {
-      long minimumItemID = Long.MAX_VALUE;
-      while (possibleItemIDs.hasNext()) {
-        long itemID = possibleItemIDs.next().get();
-        if (itemID < minimumItemID) {
-          minimumItemID = itemID;
-        }
+                     Iterable<VarLongWritable> possibleItemIDs,
+                     Context context) throws IOException, InterruptedException {
+    long minimumItemID = Long.MAX_VALUE;
+    for (VarLongWritable varLongWritable : possibleItemIDs) {
+      long itemID = varLongWritable.get();
+      if (itemID < minimumItemID) {
+        minimumItemID = itemID;
       }
-      output.collect(index, new VarLongWritable(minimumItemID));
+    }
+    if (minimumItemID != Long.MAX_VALUE) {
+      context.write(index, new VarLongWritable(minimumItemID));
     }
   }
   

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java Wed May 26 18:59:02 2010
@@ -20,24 +20,20 @@ package org.apache.mahout.cf.taste.hadoo
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.Vector;
 
-public final class PartialMultiplyMapper extends MapReduceBase implements
+public final class PartialMultiplyMapper extends
     Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,VectorWritable> {
 
   @Override
   public void map(VarIntWritable key,
                   VectorAndPrefsWritable vectorAndPrefsWritable,
-                  OutputCollector<VarLongWritable, VectorWritable> output,
-                  Reporter reporter) throws IOException {
+                  Context context) throws IOException, InterruptedException {
 
     int itemIndex = key.get();
 
@@ -55,7 +51,7 @@ public final class PartialMultiplyMapper
     excludeWritable.setWritesLaxPrecision(true);
     for (long userID : userIDs) {
       userIDWritable.set(userID);
-      output.collect(userIDWritable, excludeWritable);
+      context.write(userIDWritable, excludeWritable);
     }
 
     VectorWritable vectorWritable = new VectorWritable();
@@ -69,7 +65,7 @@ public final class PartialMultiplyMapper
             cooccurrenceColumn.times(prefValue);
         userIDWritable.set(userID);
         vectorWritable.set(partialProduct);
-        output.collect(userIDWritable, vectorWritable);
+        context.write(userIDWritable, vectorWritable);
       }
     }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Wed May 26 18:59:02 2010
@@ -25,16 +25,14 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.cli2.Option;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.MultipleInputs;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
@@ -68,7 +66,7 @@ public final class RecommenderJob extend
   public static final String BOOLEAN_DATA = "booleanData";
   
   @Override
-  public int run(String[] args) throws IOException {
+  public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
     
     Option numReccomendationsOpt = AbstractJob.buildOption("numRecommendations", "n",
       "Number of recommendations per user", "10");
@@ -84,87 +82,97 @@ public final class RecommenderJob extend
     }
     
     Configuration originalConf = getConf();
-    String inputPath = originalConf.get("mapred.input.dir");
-    String outputPath = originalConf.get("mapred.output.dir");
-    String tempDirPath = parsedArgs.get("--tempDir");
+    Path inputPath = new Path(originalConf.get("mapred.input.dir"));
+    Path outputPath = new Path(originalConf.get("mapred.output.dir"));
+    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
     int recommendationsPerUser = Integer.parseInt(parsedArgs.get("--numRecommendations"));
     String usersFile = parsedArgs.get("--usersFile");
     boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
     
-    String userVectorPath = tempDirPath + "/userVectors";
-    String itemIDIndexPath = tempDirPath + "/itemIDIndex";
-    String cooccurrencePath = tempDirPath + "/cooccurrence";
-    String partialMultiplyPath = tempDirPath + "/partialMultiply";
+    Path userVectorPath = new Path(tempDirPath, "userVectors");
+    Path itemIDIndexPath = new Path(tempDirPath, "itemIDIndex");
+    Path cooccurrencePath = new Path(tempDirPath, "cooccurrence");
+    Path prePartialMultiplyPath1 = new Path(tempDirPath, "prePartialMultiply1");
+    Path prePartialMultiplyPath2 = new Path(tempDirPath, "prePartialMultiply2");
+    Path partialMultiplyPath = new Path(tempDirPath, "partialMultiply");
 
     AtomicInteger currentPhase = new AtomicInteger();
 
-    JobConf itemIDIndexConf = prepareJobConf(
-      inputPath, itemIDIndexPath, TextInputFormat.class,
-      ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,
-      ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,
-      SequenceFileOutputFormat.class);
-    itemIDIndexConf.setClass("mapred.combiner.class", ItemIDIndexReducer.class, Reducer.class);
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      JobClient.runJob(itemIDIndexConf);
+      Job itemIDIndex = prepareJob(
+        inputPath, itemIDIndexPath, TextInputFormat.class,
+        ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,
+        ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,
+        SequenceFileOutputFormat.class);
+      itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);
+      itemIDIndex.waitForCompletion(true);
     }
-    
-    JobConf toUserVectorConf = prepareJobConf(
-      inputPath, userVectorPath, TextInputFormat.class,
-      ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
-      ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class,
-      SequenceFileOutputFormat.class);
-    toUserVectorConf.setBoolean(BOOLEAN_DATA, booleanData);
+
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      JobClient.runJob(toUserVectorConf);
+      Job toUserVector = prepareJob(
+        inputPath, userVectorPath, TextInputFormat.class,
+        ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
+        ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class,
+        SequenceFileOutputFormat.class);
+      toUserVector.getConfiguration().setBoolean(BOOLEAN_DATA, booleanData);
+      toUserVector.waitForCompletion(true);
     }
 
-    JobConf toCooccurrenceConf = prepareJobConf(
-      userVectorPath, cooccurrencePath, SequenceFileInputFormat.class,
-      UserVectorToCooccurrenceMapper.class, VarIntWritable.class, VarIntWritable.class,
-      UserVectorToCooccurrenceReducer.class, VarIntWritable.class, VectorWritable.class,
-      SequenceFileOutputFormat.class);
-    setIOSort(toCooccurrenceConf);
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      JobClient.runJob(toCooccurrenceConf);
+      Job toCooccurrence = prepareJob(
+        userVectorPath, cooccurrencePath, SequenceFileInputFormat.class,
+        UserVectorToCooccurrenceMapper.class, VarIntWritable.class, VarIntWritable.class,
+        UserVectorToCooccurrenceReducer.class, VarIntWritable.class, VectorWritable.class,
+        SequenceFileOutputFormat.class);
+      setIOSort(toCooccurrence);
+      toCooccurrence.waitForCompletion(true);
     }
 
-    JobConf partialMultiplyConf = prepareJobConf(
-      cooccurrencePath, partialMultiplyPath, SequenceFileInputFormat.class,
-      CooccurrenceColumnWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
-      ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
-      SequenceFileOutputFormat.class);
-    MultipleInputs.addInputPath(
-        partialMultiplyConf,
-        new Path(cooccurrencePath).makeQualified(FileSystem.get(partialMultiplyConf)),
-        SequenceFileInputFormat.class, CooccurrenceColumnWrapperMapper.class);
-    MultipleInputs.addInputPath(
-        partialMultiplyConf,
-        new Path(userVectorPath).makeQualified(FileSystem.get(partialMultiplyConf)),
-        SequenceFileInputFormat.class, UserVectorSplitterMapper.class);
-    if (usersFile != null) {
-      partialMultiplyConf.set(UserVectorSplitterMapper.USERS_FILE, usersFile);
-    }
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      JobClient.runJob(partialMultiplyConf);
+      Job prePartialMultiply1 = prepareJob(
+        cooccurrencePath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
+        CooccurrenceColumnWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
+        Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
+        SequenceFileOutputFormat.class);
+      prePartialMultiply1.waitForCompletion(true);
+
+      Job prePartialMultiply2 = prepareJob(
+        userVectorPath, prePartialMultiplyPath2, SequenceFileInputFormat.class,
+        UserVectorSplitterMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
+        Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
+        SequenceFileOutputFormat.class);
+      if (usersFile != null) {
+        prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
+      }
+      prePartialMultiply2.waitForCompletion(true);
+
+      Job partialMultiply = prepareJob(
+        new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath, SequenceFileInputFormat.class,
+        Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
+        ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
+        SequenceFileOutputFormat.class);
+      partialMultiply.waitForCompletion(true);
     }
 
-    JobConf aggregateAndRecommendConf = prepareJobConf(
-        partialMultiplyPath, outputPath, SequenceFileInputFormat.class,
-        PartialMultiplyMapper.class, VarLongWritable.class, VectorWritable.class,
-        AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
-        TextOutputFormat.class);
-    setIOSort(aggregateAndRecommendConf);
-    aggregateAndRecommendConf.setClass("mapred.combiner.class", AggregateCombiner.class, Reducer.class);
-    aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath);
-    aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      JobClient.runJob(aggregateAndRecommendConf);
+      Job aggregateAndRecommend = prepareJob(
+          partialMultiplyPath, outputPath, SequenceFileInputFormat.class,
+          PartialMultiplyMapper.class, VarLongWritable.class, VectorWritable.class,
+          AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
+          TextOutputFormat.class);
+      Configuration jobConf = aggregateAndRecommend.getConfiguration();
+      setIOSort(aggregateAndRecommend);
+      aggregateAndRecommend.setCombinerClass(AggregateCombiner.class);
+      jobConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath.toString());
+      jobConf.setInt(AggregateAndRecommendReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
+      aggregateAndRecommend.waitForCompletion(true);
     }
 
     return 0;
   }
 
-  private static void setIOSort(JobConf conf) {
+  private static void setIOSort(Job job) {
+    Configuration conf = job.getConfiguration();
     conf.setInt("io.sort.factor", 100);
     int assumedHeapSize = 512;
     String javaOpts = conf.get("mapred.child.java.opts");

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java Wed May 26 18:59:02 2010
@@ -18,12 +18,8 @@
 package org.apache.mahout.cf.taste.hadoop.item;
 
 import java.io.IOException;
-import java.util.Iterator;
 
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.VarLongWritable;
@@ -47,20 +43,15 @@ import org.apache.mahout.math.VectorWrit
  * {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.
  * </p>
  */
-public final class ToUserVectorReducer extends MapReduceBase implements
+public final class ToUserVectorReducer extends
     Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
   
   @Override
   public void reduce(VarLongWritable userID,
-                     Iterator<VarLongWritable> itemPrefs,
-                     OutputCollector<VarLongWritable,VectorWritable> output,
-                     Reporter reporter) throws IOException {
-    if (!itemPrefs.hasNext()) {
-      return;
-    }
+                     Iterable<VarLongWritable> itemPrefs,
+                     Context context) throws IOException, InterruptedException {
     Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
-    while (itemPrefs.hasNext()) {
-      VarLongWritable itemPref = itemPrefs.next();
+    for (VarLongWritable itemPref : itemPrefs) {
       int index = ItemIDIndexMapper.idToIndex(itemPref.get());
       float value;
       if (itemPref instanceof EntityPrefWritable) {
@@ -73,7 +64,7 @@ public final class ToUserVectorReducer e
 
     VectorWritable vw = new VectorWritable(userVector);
     vw.setWritesLaxPrecision(true);
-    output.collect(userID, vw);
+    context.write(userID, vw);
   }
   
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java Wed May 26 18:59:02 2010
@@ -19,30 +19,24 @@ package org.apache.mahout.cf.taste.hadoo
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.Vector;
 
-public final class ToVectorAndPrefReducer extends MapReduceBase implements
+public final class ToVectorAndPrefReducer extends
     Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> {
 
   @Override
   public void reduce(VarIntWritable key,
-                     Iterator<VectorOrPrefWritable> values,
-                     OutputCollector<VarIntWritable,VectorAndPrefsWritable> output,
-                     Reporter reporter) throws IOException {
+                     Iterable<VectorOrPrefWritable> values,
+                     Context context) throws IOException, InterruptedException {
 
     List<Long> userIDs = new ArrayList<Long>();
     List<Float> prefValues = new ArrayList<Float>();
     Vector cooccurrenceColumn = null;
-    while (values.hasNext()) {
-      VectorOrPrefWritable value = values.next();
+    for (VectorOrPrefWritable value : values) {
       if (value.getVector() == null) {
         // Then this is a user-pref value
         userIDs.add(value.getUserID());
@@ -61,7 +55,7 @@ public final class ToVectorAndPrefReduce
     }
 
     VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(cooccurrenceColumn, userIDs, prefValues);
-    output.collect(key, vectorAndPrefs);
+    context.write(key, vectorAndPrefs);
   }
 
 }
\ No newline at end of file

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java Wed May 26 18:59:02 2010
@@ -21,14 +21,11 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.PriorityQueue;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.cf.taste.impl.common.FastIDSet;
 import org.apache.mahout.common.FileLineIterable;
 import org.apache.mahout.math.VarIntWritable;
@@ -36,7 +33,7 @@ import org.apache.mahout.math.VarLongWri
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
-public final class UserVectorSplitterMapper extends MapReduceBase implements
+public final class UserVectorSplitterMapper extends
     Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
 
   private static final int MAX_PREFS_CONSIDERED = 10;  
@@ -45,7 +42,8 @@ public final class UserVectorSplitterMap
   private FastIDSet usersToRecommendFor;
 
   @Override
-  public void configure(JobConf jobConf) {
+  public void setup(Context context) {
+    Configuration jobConf = context.getConfiguration();
     try {
       FileSystem fs = FileSystem.get(jobConf);
       String usersFilePathString = jobConf.get(USERS_FILE);
@@ -67,8 +65,7 @@ public final class UserVectorSplitterMap
   @Override
   public void map(VarLongWritable key,
                   VectorWritable value,
-                  OutputCollector<VarIntWritable,VectorOrPrefWritable> output,
-                  Reporter reporter) throws IOException {
+                  Context context) throws IOException, InterruptedException {
     long userID = key.get();
     if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
       return;
@@ -81,7 +78,7 @@ public final class UserVectorSplitterMap
       Vector.Element e = it.next();
       itemIndexWritable.set(e.index());
       vectorOrPref.set(userID, (float) e.get());
-      output.collect(itemIndexWritable, vectorOrPref);
+      context.write(itemIndexWritable, vectorOrPref);
     }
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java Wed May 26 18:59:02 2010
@@ -22,17 +22,14 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.PriorityQueue;
 
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.map.OpenIntIntHashMap;
 
-public final class UserVectorToCooccurrenceMapper extends MapReduceBase implements
+public final class UserVectorToCooccurrenceMapper extends
     Mapper<VarLongWritable,VectorWritable,VarIntWritable,VarIntWritable> {
 
   private static final int MAX_PREFS_CONSIDERED = 100;
@@ -46,8 +43,7 @@ public final class UserVectorToCooccurre
   @Override
   public void map(VarLongWritable userID,
                   VectorWritable userVectorWritable,
-                  OutputCollector<VarIntWritable,VarIntWritable> output,
-                  Reporter reporter) throws IOException {
+                  Context context) throws IOException, InterruptedException {
 
     Vector userVector = userVectorWritable.get();
     countSeen(userVector);
@@ -56,7 +52,7 @@ public final class UserVectorToCooccurre
     userVector = maybePruneUserVector(userVector);
     int newSize = userVector.getNumNondefaultElements();
     if (newSize < originalSize) {
-      reporter.incrCounter(Counters.USER_PREFS_SKIPPED, originalSize - newSize);
+      context.getCounter(Counters.USER_PREFS_SKIPPED).increment(originalSize - newSize);
     }
 
     Iterator<Vector.Element> it = userVector.iterateNonZero();
@@ -69,7 +65,7 @@ public final class UserVectorToCooccurre
       while (it2.hasNext()) {
         int index2 = it2.next().index();
         itemIndex2.set(index2);
-        output.collect(itemIndex1, itemIndex2);
+        context.write(itemIndex1, itemIndex2);
       }
     }
   }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java Wed May 26 18:59:02 2010
@@ -18,33 +18,28 @@
 package org.apache.mahout.cf.taste.hadoop.item;
 
 import java.io.IOException;
-import java.util.Iterator;
 
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
-public final class UserVectorToCooccurrenceReducer extends MapReduceBase implements
+public final class UserVectorToCooccurrenceReducer extends
     Reducer<VarIntWritable,VarIntWritable,VarIntWritable,VectorWritable> {
 
   @Override
   public void reduce(VarIntWritable itemIndex1,
-                     Iterator<VarIntWritable> itemIndex2s,
-                     OutputCollector<VarIntWritable,VectorWritable> output,
-                     Reporter reporter) throws IOException {
+                     Iterable<VarIntWritable> itemIndex2s,
+                     Context context) throws IOException, InterruptedException {
     Vector cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
-    while (itemIndex2s.hasNext()) {
-      int itemIndex2 = itemIndex2s.next().get();
+    for (VarIntWritable varIntWritable : itemIndex2s) {
+      int itemIndex2 = varIntWritable.get();
       cooccurrenceRow.set(itemIndex2, cooccurrenceRow.get(itemIndex2) + 1.0);
     }
     VectorWritable vw = new VectorWritable(cooccurrenceRow);
     vw.setWritesLaxPrecision(true);
-    output.collect(itemIndex1, vw);
+    context.write(itemIndex1, vw);
   }
   
 }
\ No newline at end of file

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java Wed May 26 18:59:02 2010
@@ -22,13 +22,14 @@ import java.util.Map;
 
 import org.apache.commons.cli2.Option;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
@@ -103,7 +104,7 @@ import org.apache.mahout.math.VarLongWri
 public final class RecommenderJob extends AbstractJob {
   
   @Override
-  public int run(String[] args) throws IOException {
+  public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
     
     Option recommendClassOpt = AbstractJob.buildOption("recommenderClassName", "r",
       "Name of recommender class to instantiate");
@@ -119,26 +120,35 @@ public final class RecommenderJob extend
     }
 
     Configuration originalConf = getConf();
-    String inputFile = originalConf.get("mapred.input.dir");
-    String outputPath = originalConf.get("mapred.output.dir");
-    String usersFile = parsedArgs.get("--usersFile");
-    if (usersFile == null) {
+    Path inputFile = new Path(originalConf.get("mapred.input.dir"));
+    Path outputPath = new Path(originalConf.get("mapred.output.dir"));
+    Path usersFile;
+    if (parsedArgs.get("--usersFile") == null) {
       usersFile = inputFile;
+    } else {
+      usersFile = new Path(parsedArgs.get("--usersFile"));
     }
     
     String recommendClassName = parsedArgs.get("--recommenderClassName");
     int recommendationsPerUser = Integer.parseInt(parsedArgs.get("--numRecommendations"));
     
-    JobConf jobConf = prepareJobConf(usersFile, outputPath, TextInputFormat.class,
-      UserIDsMapper.class, VarLongWritable.class, NullWritable.class, RecommenderReducer.class,
-      VarLongWritable.class, RecommendedItemsWritable.class, TextOutputFormat.class);
-    
+    Job job = prepareJob(usersFile,
+                         outputPath,
+                         TextInputFormat.class,
+                         UserIDsMapper.class,
+                         VarLongWritable.class,
+                         NullWritable.class,
+                         RecommenderReducer.class,
+                         VarLongWritable.class,
+                         RecommendedItemsWritable.class,
+                         TextOutputFormat.class);
+    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+    Configuration jobConf = job.getConfiguration();
     jobConf.set(RecommenderReducer.RECOMMENDER_CLASS_NAME, recommendClassName);
     jobConf.setInt(RecommenderReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
-    jobConf.set(RecommenderReducer.DATA_MODEL_FILE, inputFile);
-    jobConf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
-    
-    JobClient.runJob(jobConf);
+    jobConf.set(RecommenderReducer.DATA_MODEL_FILE, inputFile.toString());
+
+    job.waitForCompletion(true);
     return 0;
   }
   

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java Wed May 26 18:59:02 2010
@@ -24,14 +24,11 @@ import java.lang.reflect.InvocationTarge
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.cf.taste.common.TasteException;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
 import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;
@@ -49,7 +46,7 @@ import org.apache.mahout.math.VarLongWri
  * 
  * @see RecommenderJob
  */
-public final class RecommenderReducer extends MapReduceBase implements
+public final class RecommenderReducer extends
     Reducer<VarLongWritable,NullWritable,VarLongWritable,RecommendedItemsWritable> {
   
   static final String RECOMMENDER_CLASS_NAME = "recommenderClassName";
@@ -60,7 +57,8 @@ public final class RecommenderReducer ex
   private int recommendationsPerUser;
 
   @Override
-  public void configure(JobConf jobConf) {
+  public void setup(Context context) {
+    Configuration jobConf = context.getConfiguration();
     String dataModelFile = jobConf.get(DATA_MODEL_FILE);
     String recommenderClassName = jobConf.get(RECOMMENDER_CLASS_NAME);
     FileDataModel fileDataModel;
@@ -95,9 +93,8 @@ public final class RecommenderReducer ex
   
   @Override
   public void reduce(VarLongWritable key,
-                     Iterator<NullWritable> values,
-                     OutputCollector<VarLongWritable,RecommendedItemsWritable> output,
-                     Reporter reporter) throws IOException {
+                     Iterable<NullWritable> values,
+                     Context context) throws IOException, InterruptedException {
     long userID = key.get();
     List<RecommendedItem> recommendedItems;
     try {
@@ -112,8 +109,8 @@ public final class RecommenderReducer ex
       }
     }
     RecommendedItemsWritable writable = new RecommendedItemsWritable(recommendedItems);
-    output.collect(key, writable);
-    reporter.getCounter(ReducerMetrics.USERS_PROCESSED).increment(1L);
-    reporter.getCounter(ReducerMetrics.RECOMMENDATIONS_MADE).increment(recommendedItems.size());
+    context.write(key, writable);
+    context.getCounter(ReducerMetrics.USERS_PROCESSED).increment(1L);
+    context.getCounter(ReducerMetrics.RECOMMENDATIONS_MADE).increment(recommendedItems.size());
   }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java Wed May 26 18:59:02 2010
@@ -22,27 +22,23 @@ import java.io.IOException;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.math.VarLongWritable;
 
 /**
  * Extracts and emits all user IDs from the users file, or input file.
  */
-public final class UserIDsMapper extends MapReduceBase implements
+public final class UserIDsMapper extends
     Mapper<LongWritable,Text, VarLongWritable,NullWritable> {
   
   @Override
   public void map(LongWritable key,
                   Text value,
-                  OutputCollector<VarLongWritable,NullWritable> output,
-                  Reporter reporter) throws IOException {
+                  Context context) throws IOException, InterruptedException {
     String line = value.toString();
     int comma = line.indexOf(',');
     long userID = comma >= 0 ? Long.parseLong(line.substring(0, comma)) : Long.parseLong(line);
-    output.collect(new VarLongWritable(userID), NullWritable.get());
+    context.write(new VarLongWritable(userID), NullWritable.get());
   }
   
 }
\ No newline at end of file

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/AbstractDistributedItemSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/AbstractDistributedItemSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/AbstractDistributedItemSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/AbstractDistributedItemSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
 
 package org.apache.mahout.cf.taste.hadoop.similarity;
 
-import java.util.Iterator;
-
 /**
  * abstract base class for all implementations of {@link DistributedItemSimilarity} that does not give a
  * weight to item vectors and only ensures that the result is within [-1,1]
@@ -27,9 +25,10 @@ public abstract class AbstractDistribute
     implements DistributedItemSimilarity {
 
   @Override
-  public final double similarity(Iterator<CoRating> coratings,
-      double weightOfItemVectorX, double weightOfItemVectorY,
-      int numberOfUsers) {
+  public final double similarity(Iterable<CoRating> coratings,
+                                 double weightOfItemVectorX,
+                                 double weightOfItemVectorY,
+                                 int numberOfUsers) {
 
     double result = doComputeResult(coratings, weightOfItemVectorX, weightOfItemVectorY, numberOfUsers);
 
@@ -46,11 +45,11 @@ public abstract class AbstractDistribute
    * when they need a weight
    */
   @Override
-  public double weightOfItemVector(Iterator<Float> prefValues) {
+  public double weightOfItemVector(Iterable<Float> prefValues) {
     return Double.NaN;
   }
 
-  protected abstract double doComputeResult(Iterator<CoRating> coratings,
+  protected abstract double doComputeResult(Iterable<CoRating> coratings,
       double weightOfItemVectorX, double weightOfItemVectorY,
       int numberOfUsers);
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedEuclideanDistanceSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedEuclideanDistanceSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedEuclideanDistanceSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedEuclideanDistanceSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
 
 package org.apache.mahout.cf.taste.hadoop.similarity;
 
-import java.util.Iterator;
-
 import org.apache.mahout.cf.taste.impl.similarity.EuclideanDistanceSimilarity;
 
 /**
@@ -27,15 +25,15 @@ import org.apache.mahout.cf.taste.impl.s
 public class DistributedEuclideanDistanceSimilarity extends AbstractDistributedItemSimilarity {
 
   @Override
-  protected double doComputeResult(Iterator<CoRating> coratings,
-      double weightOfItemVectorX, double weightOfItemVectorY,
-      int numberOfUsers) {
+  protected double doComputeResult(Iterable<CoRating> coratings,
+                                   double weightOfItemVectorX,
+                                   double weightOfItemVectorY,
+                                   int numberOfUsers) {
 
     double n=0;
     double sumXYdiff2 = 0;
 
-    while (coratings.hasNext()) {
-      CoRating coRating = coratings.next();
+    for (CoRating coRating : coratings) {
       double diff = coRating.getPrefValueX() - coRating.getPrefValueY();
       sumXYdiff2 += diff * diff;
       n++;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
 
 package org.apache.mahout.cf.taste.hadoop.similarity;
 
-import java.util.Iterator;
-
 /**
  * Modelling the pairwise similarity computation of items in a distributed manner
  */
@@ -30,7 +28,7 @@ public interface DistributedItemSimilari
    * @param prefValues
    * @return
    */
-  double weightOfItemVector(Iterator<Float> prefValues);
+  double weightOfItemVector(Iterable<Float> prefValues);
 
   /**
    * compute the similarity for a pair of item-vectors
@@ -41,7 +39,7 @@ public interface DistributedItemSimilari
    * @param numberOfUsers the overall number of users
    * @return
    */
-  double similarity(Iterator<CoRating> coratings,
+  double similarity(Iterable<CoRating> coratings,
                     double weightOfItemVectorX,
                     double weightOfItemVectorY,
                     int numberOfUsers);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedLogLikelihoodSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedLogLikelihoodSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedLogLikelihoodSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedLogLikelihoodSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
 
 package org.apache.mahout.cf.taste.hadoop.similarity;
 
-import java.util.Iterator;
-
 import org.apache.mahout.cf.taste.impl.similarity.LogLikelihoodSimilarity;
 
 /**
@@ -27,13 +25,13 @@ import org.apache.mahout.cf.taste.impl.s
 public class DistributedLogLikelihoodSimilarity extends AbstractDistributedItemSimilarity {
 
   @Override
-  protected double doComputeResult(Iterator<CoRating> coratings,
-      double weightOfItemVectorX, double weightOfItemVectorY,
-      int numberOfUsers) {
+  protected double doComputeResult(Iterable<CoRating> coratings,
+                                   double weightOfItemVectorX,
+                                   double weightOfItemVectorY,
+                                   int numberOfUsers) {
 
     int preferringXandY = 0;
-    while (coratings.hasNext()) {
-      coratings.next();
+    for (CoRating corating : coratings) {
       preferringXandY++;
     }
 
@@ -53,10 +51,9 @@ public class DistributedLogLikelihoodSim
   }
 
   @Override
-  public double weightOfItemVector(Iterator<Float> prefValues) {
-    double nonZeroEntries = 0;
-    while (prefValues.hasNext()) {
-      prefValues.next();
+  public double weightOfItemVector(Iterable<Float> prefValues) {
+    int nonZeroEntries = 0;
+    for (Float prefValue : prefValues) {
       nonZeroEntries++;
     }
     return nonZeroEntries;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
 
 package org.apache.mahout.cf.taste.hadoop.similarity;
 
-import java.util.Iterator;
-
 import org.apache.mahout.cf.taste.impl.similarity.PearsonCorrelationSimilarity;
 
 /**
@@ -27,9 +25,10 @@ import org.apache.mahout.cf.taste.impl.s
 public class DistributedPearsonCorrelationSimilarity extends AbstractDistributedItemSimilarity {
 
   @Override
-  protected double doComputeResult(Iterator<CoRating> coRatings,
-      double weightOfItemVectorX, double weightOfItemVectorY,
-      int numberOfUsers) {
+  protected double doComputeResult(Iterable<CoRating> coRatings,
+                                   double weightOfItemVectorX,
+                                   double weightOfItemVectorY,
+                                   int numberOfUsers) {
 
     int count = 0;
     double sumX = 0.0;
@@ -38,8 +37,7 @@ public class DistributedPearsonCorrelati
     double sumX2 = 0.0;
     double sumY2 = 0.0;
 
-    while (coRatings.hasNext()) {
-      CoRating coRating = coRatings.next();
+    for (CoRating coRating : coRatings) {
       double x = coRating.getPrefValueX();
       double y = coRating.getPrefValueY();
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedTanimotoCoefficientSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedTanimotoCoefficientSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedTanimotoCoefficientSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedTanimotoCoefficientSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
 
 package org.apache.mahout.cf.taste.hadoop.similarity;
 
-import java.util.Iterator;
-
 import org.apache.mahout.cf.taste.impl.similarity.TanimotoCoefficientSimilarity;
 
 /**
@@ -27,13 +25,13 @@ import org.apache.mahout.cf.taste.impl.s
 public class DistributedTanimotoCoefficientSimilarity extends AbstractDistributedItemSimilarity {
 
 	@Override
-	protected double doComputeResult(Iterator<CoRating> coratings,
-			double weightOfItemVectorX, double weightOfItemVectorY,
-			int numberOfUsers) {
-
-	  double preferringXAndY = 0;
-	  while (coratings.hasNext()) {
-	    coratings.next();
+	protected double doComputeResult(Iterable<CoRating> coratings,
+			                             double weightOfItemVectorX,
+                                   double weightOfItemVectorY,
+			                             int numberOfUsers) {
+
+	  int preferringXAndY = 0;
+	  for (CoRating coRating : coratings) {
 	    preferringXAndY++;
 	  }
 
@@ -45,10 +43,9 @@ public class DistributedTanimotoCoeffici
 	}
 
 	@Override
-	public double weightOfItemVector(Iterator<Float> prefValues) {
-		double nonZeroEntries = 0;
-		while (prefValues.hasNext()) {
-		  prefValues.next();
+	public double weightOfItemVector(Iterable<Float> prefValues) {
+		int nonZeroEntries = 0;
+		for (Float prefValue : prefValues) {
 		  nonZeroEntries++;
 		}
 		return nonZeroEntries;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredCosineSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredCosineSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredCosineSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredCosineSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
 
 package org.apache.mahout.cf.taste.hadoop.similarity;
 
-import java.util.Iterator;
-
 import org.apache.mahout.cf.taste.impl.similarity.UncenteredCosineSimilarity;
 
 /**
@@ -27,17 +25,17 @@ import org.apache.mahout.cf.taste.impl.s
 public class DistributedUncenteredCosineSimilarity extends AbstractDistributedItemSimilarity {
 
   @Override
-  protected double doComputeResult(Iterator<CoRating> coratings,
-      double weightOfItemVectorX, double weightOfItemVectorY,
-      int numberOfUsers) {
+  protected double doComputeResult(Iterable<CoRating> coratings,
+                                   double weightOfItemVectorX,
+                                   double weightOfItemVectorY,
+                                   int numberOfUsers) {
 
     int n = 0;
-    double sumXY = 0;
-    double sumX2 = 0;
-    double sumY2 = 0;
+    double sumXY = 0.0;
+    double sumX2 = 0.0;
+    double sumY2 = 0.0;
 
-    while (coratings.hasNext()) {
-      CoRating coRating = coratings.next();
+    for (CoRating coRating : coratings) {
       double x = coRating.getPrefValueX();
       double y = coRating.getPrefValueY();
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java Wed May 26 18:59:02 2010
@@ -27,27 +27,26 @@ public final class DistributedUncentered
     extends AbstractDistributedItemSimilarity {
 
   @Override
-  protected double doComputeResult(Iterator<CoRating> coRatings,
-      double weightOfItemVectorX, double weightOfItemVectorY,
-      int numberOfUsers) {
-
-    double sumXY = 0;
-    while (coRatings.hasNext()) {
-      CoRating coRating = coRatings.next();
+  protected double doComputeResult(Iterable<CoRating> coRatings,
+                                   double weightOfItemVectorX,
+                                   double weightOfItemVectorY,
+                                   int numberOfUsers) {
+
+    double sumXY = 0.0;
+    for (CoRating coRating : coRatings) {
       sumXY += coRating.getPrefValueX() * coRating.getPrefValueY();
     }
 
-    if (sumXY == 0) {
+    if (sumXY == 0.0) {
       return Double.NaN;
     }
     return sumXY / (weightOfItemVectorX * weightOfItemVectorY);
   }
 
   @Override
-  public double weightOfItemVector(Iterator<Float> prefValues) {
+  public double weightOfItemVector(Iterable<Float> prefValues) {
     double length = 0.0;
-    while (prefValues.hasNext()) {
-      float prefValue = prefValues.next();
+    for (float prefValue : prefValues) {
       if (!Float.isNaN(prefValue)) {
         length += prefValue * prefValue;
       }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java Wed May 26 18:59:02 2010
@@ -19,10 +19,7 @@ package org.apache.mahout.cf.taste.hadoo
 
 import java.io.IOException;
 
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.cf.taste.hadoop.similarity.CoRating;
 import org.apache.mahout.math.VarLongWritable;
 
@@ -30,15 +27,14 @@ import org.apache.mahout.math.VarLongWri
  * map out each pair of items that appears in the same user-vector together with the multiplied vector lengths
  * of the associated item vectors
  */
-public final class CopreferredItemsMapper extends MapReduceBase
-    implements Mapper<VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable,ItemPairWritable,CoRating> {
+public final class CopreferredItemsMapper extends
+    Mapper<VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable,ItemPairWritable,CoRating> {
 
   @Override
   public void map(VarLongWritable user,
                   ItemPrefWithItemVectorWeightArrayWritable itemPrefsArray,
-                  OutputCollector<ItemPairWritable, CoRating> output,
-                  Reporter reporter)
-      throws IOException {
+                  Context context)
+      throws IOException, InterruptedException {
 
     ItemPrefWithItemVectorWeightWritable[] itemPrefs = itemPrefsArray.getItemPrefs();
 
@@ -52,7 +48,7 @@ public final class CopreferredItemsMappe
         long itemAID = Math.min(itemNID, itemM.getItemID());
         long itemBID = Math.max(itemNID, itemM.getItemID());
         ItemPairWritable pair = new ItemPairWritable(itemAID, itemBID, itemNWeight, itemM.getWeight());
-        output.collect(pair, new CoRating(itemNValue, itemM.getPrefValue()));
+        context.write(pair, new CoRating(itemNValue, itemM.getPrefValue()));
       }
     }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java Wed May 26 18:59:02 2010
@@ -22,10 +22,10 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.mahout.common.RandomUtils;
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Varint;
@@ -81,15 +81,13 @@ public class CountUsersKeyWritable imple
   /**
    * all userIDs go to the same partition
    */
-  public static class CountUsersPartitioner implements Partitioner<CountUsersKeyWritable,VarLongWritable> {
+  public static class CountUsersPartitioner extends Partitioner<CountUsersKeyWritable,VarLongWritable> {
 
     @Override
     public int getPartition(CountUsersKeyWritable key, VarLongWritable value, int numPartitions) {
       return 0;
     }
 
-    @Override
-    public void configure(JobConf conf) {}
   }
 
   /**

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java Wed May 26 18:59:02 2010
@@ -22,29 +22,25 @@ import java.util.regex.Pattern;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.math.VarLongWritable;
 
 /**
- * maps out the userIDs in a way that we can use a secondary sort on them
+ * Maps out the userIDs in a way that we can use a secondary sort on them
  */
-public class CountUsersMapper extends MapReduceBase
-    implements Mapper<LongWritable,Text,CountUsersKeyWritable, VarLongWritable> {
+public class CountUsersMapper extends
+    Mapper<LongWritable,Text,CountUsersKeyWritable, VarLongWritable> {
 
   private static final Pattern DELIMITER = Pattern.compile("[\t,]");
 
   @Override
-  public void map(LongWritable arg0, Text value,
-      OutputCollector<CountUsersKeyWritable,VarLongWritable> out, Reporter reporter)
-      throws IOException {
+  public void map(LongWritable key, Text value, Context context)
+      throws IOException, InterruptedException {
 
     String[] tokens = DELIMITER.split(value.toString());
     long userID = Long.parseLong(tokens[0]);
 
-    out.collect(new CountUsersKeyWritable(userID), new VarLongWritable(userID));
+    context.write(new CountUsersKeyWritable(userID), new VarLongWritable(userID));
   }
 
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersReducer.java Wed May 26 18:59:02 2010
@@ -18,13 +18,9 @@
 package org.apache.mahout.cf.taste.hadoop.similarity.item;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
 
@@ -32,25 +28,26 @@ import org.apache.mahout.math.VarLongWri
  * counts all unique users, we ensure that we see userIDs sorted in ascending order via
  * secondary sort, so we don't have to buffer all of them
  */
-public class CountUsersReducer extends MapReduceBase
-    implements Reducer<CountUsersKeyWritable,VarLongWritable, VarIntWritable,NullWritable> {
+public class CountUsersReducer extends
+    Reducer<CountUsersKeyWritable,VarLongWritable, VarIntWritable,NullWritable> {
 
   @Override
-  public void reduce(CountUsersKeyWritable key, Iterator<VarLongWritable> userIDs,
-      OutputCollector<VarIntWritable,NullWritable> out, Reporter reporter)
-      throws IOException {
+  public void reduce(CountUsersKeyWritable key,
+                     Iterable<VarLongWritable> userIDs,
+                     Context context)
+      throws IOException, InterruptedException {
 
     long lastSeenUserID = Long.MIN_VALUE;
     int numberOfUsers = 0;
 
-    while (userIDs.hasNext()) {
-      long currentUserID = userIDs.next().get();
+    for (VarLongWritable writable : userIDs) {
+      long currentUserID = writable.get();
       if (currentUserID > lastSeenUserID) {
         lastSeenUserID = currentUserID;
         numberOfUsers++;
       }
     }
-    out.collect(new VarIntWritable(numberOfUsers), NullWritable.get());
+    context.write(new VarIntWritable(numberOfUsers), NullWritable.get());
   }
 
 }