You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/05/25 00:44:53 UTC

svn commit: r947844 [1/2] - in /mahout/trunk: ./ core/src/main/java/org/apache/mahout/cf/taste/hadoop/ core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/ core/src/main/java/org/apache...

Author: srowen
Date: Mon May 24 22:44:51 2010
New Revision: 947844

URL: http://svn.apache.org/viewvc?rev=947844&view=rev
Log:
Big commit for item-based distributed recommender before it gets bigger. Do away with combiner in co-occurrence phase as it hurt. Use the new varints everywhere. And more.

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
      - copied, changed from r945806, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
      - copied, changed from r945806, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java
      - copied, changed from r945806, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/VarIntWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/VarLongWritable.java
Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/FirstIndexPartitioner.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java
Modified:
    mahout/trunk/   (props changed)
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOnePrefsToDiffsReducer.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/Gram.java
    mahout/trunk/utils/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/GramKey.java

Propchange: mahout/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon May 24 22:44:51 2010
@@ -11,3 +11,5 @@ dist
 atlassian-ide-plugin.xml
 target
 input
+
+.idea

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -21,12 +21,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Varint;
 
 /** A {@link Writable} encapsulating an item ID and a count . */
-public final class EntityCountWritable extends VLongWritable implements Cloneable {
+public final class EntityCountWritable extends VarLongWritable implements Cloneable {
 
   private int count;
 
@@ -59,13 +59,13 @@ public final class EntityCountWritable e
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    WritableUtils.writeVInt(out, count);
+    Varint.writeUnsignedVarInt(count, out);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    count = WritableUtils.readVInt(in);
+    count = Varint.readUnsignedVarInt(in);
   }
 
   @Override

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,8 +22,8 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.Varint;
 
 /** A {@link WritableComparable} encapsulating two items. */
 public final class EntityEntityWritable
@@ -56,14 +56,14 @@ public final class EntityEntityWritable
   
   @Override
   public void write(DataOutput out) throws IOException {
-    WritableUtils.writeVLong(out, aID);
-    WritableUtils.writeVLong(out, bID);
+    Varint.writeSignedVarLong(aID, out);
+    Varint.writeSignedVarLong(bID, out);
   }
   
   @Override
   public void readFields(DataInput in) throws IOException {
-    aID = WritableUtils.readVLong(in);
-    bID = WritableUtils.readVLong(in);
+    aID = Varint.readSignedVarLong(in);
+    bID = Varint.readSignedVarLong(in);
   }
   
   @Override

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -21,12 +21,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.VarLongWritable;
 
 /** A {@link Writable} encapsulating an item ID and a preference value. */
-public final class EntityPrefWritable extends VLongWritable implements Cloneable {
+public final class EntityPrefWritable extends VarLongWritable implements Cloneable {
   
   private float prefValue;
   

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -24,9 +24,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
 import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.math.Varint;
 
 /**
  * A {@link Writable} which encapsulates a list of {@link RecommendedItem}s. This is the mapper (and reducer)
@@ -57,7 +57,7 @@ public final class RecommendedItemsWrita
   public void write(DataOutput out) throws IOException {
     out.writeInt(recommended.size());
     for (RecommendedItem item : recommended) {
-      WritableUtils.writeVLong(out, item.getItemID());
+      Varint.writeSignedVarLong(item.getItemID(), out);
       out.writeFloat(item.getValue());
     }
     
@@ -68,7 +68,7 @@ public final class RecommendedItemsWrita
     int size = in.readInt();
     recommended = new ArrayList<RecommendedItem>(size);
     for (int i = 0; i < size; i++) {
-      long itemID = WritableUtils.readVLong(in);
+      long itemID = Varint.readSignedVarLong(in);
       float value = in.readFloat();
       RecommendedItem recommendedItem = new GenericRecommendedItem(itemID, value);
       recommended.add(recommendedItem);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,21 +19,21 @@ package org.apache.mahout.cf.taste.hadoo
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
+import org.apache.mahout.math.VarLongWritable;
 
 import java.io.IOException;
 import java.util.regex.Pattern;
 
-abstract class ToEntityPrefsMapper extends MapReduceBase implements
-    Mapper<LongWritable,Text,VLongWritable,VLongWritable> {
+public abstract class ToEntityPrefsMapper extends MapReduceBase implements
+    Mapper<LongWritable,Text, VarLongWritable,VarLongWritable> {
 
-  static final String TRANSPOSE_USER_ITEM = "transposeUserItem";
+  public static final String TRANSPOSE_USER_ITEM = "transposeUserItem";
 
   private static final Pattern DELIMITER = Pattern.compile("[\t,]");
 
@@ -54,7 +54,7 @@ abstract class ToEntityPrefsMapper exten
   @Override
   public void map(LongWritable key,
                   Text value,
-                  OutputCollector<VLongWritable,VLongWritable> output,
+                  OutputCollector<VarLongWritable,VarLongWritable> output,
                   Reporter reporter) throws IOException {
     String[] tokens = ToEntityPrefsMapper.DELIMITER.split(value.toString());
     long userID = Long.parseLong(tokens[0]);
@@ -68,11 +68,11 @@ abstract class ToEntityPrefsMapper exten
       itemID = temp;
     }
     if (booleanData) {
-      output.collect(new VLongWritable(userID), new VLongWritable(itemID));
+      output.collect(new VarLongWritable(userID), new VarLongWritable(itemID));
     } else {
       float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) : 1.0f;
-      output.collect(new VLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
+      output.collect(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
     }
   }
 
-}
\ No newline at end of file
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -24,7 +24,7 @@ import org.apache.hadoop.io.Text;
  * 
  * <p>
  * Intended for use with {@link org.apache.hadoop.mapred.TextInputFormat}; accepts line number / line pairs as
- * {@link org.apache.hadoop.io.VLongWritable}/{@link Text} pairs.
+ * {@link org.apache.mahout.math.VarLongWritable}/{@link Text} pairs.
  * </p>
  * 
  * <p>
@@ -35,7 +35,7 @@ import org.apache.hadoop.io.Text;
  * <h1>Output</h1>
  * 
  * <p>
- * Outputs the user ID as a {@link org.apache.hadoop.io.VLongWritable} mapped to the item ID and preference as a
+ * Outputs the user ID as a {@link org.apache.mahout.math.VarLongWritable} mapped to the item ID and preference as a
  * {@link EntityPrefWritable}.
  * </p>
  */

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -29,9 +29,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -41,12 +39,14 @@ import org.apache.mahout.cf.taste.hadoop
 import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
 import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
 import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.map.OpenIntLongHashMap;
 
 public final class AggregateAndRecommendReducer extends MapReduceBase implements
-    Reducer<VLongWritable,VectorWritable,VLongWritable,RecommendedItemsWritable> {
+    Reducer<VarLongWritable,VectorWritable,VarLongWritable,RecommendedItemsWritable> {
 
   static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
   static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
@@ -68,8 +68,8 @@ public final class AggregateAndRecommend
       FileSystem fs = FileSystem.get(jobConf);
       Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
       indexItemIDMap = new OpenIntLongHashMap();
-      IntWritable index = new IntWritable();
-      VLongWritable id = new VLongWritable();
+      VarIntWritable index = new VarIntWritable();
+      VarLongWritable id = new VarLongWritable();
       for (FileStatus status : fs.listStatus(itemIDIndexPath, PARTS_FILTER)) {
         String path = status.getPath().toString();
         SequenceFile.Reader reader =
@@ -85,9 +85,9 @@ public final class AggregateAndRecommend
   }
 
   @Override
-  public void reduce(VLongWritable key,
+  public void reduce(VarLongWritable key,
                      Iterator<VectorWritable> values,
-                     OutputCollector<VLongWritable,RecommendedItemsWritable> output,
+                     OutputCollector<VarLongWritable,RecommendedItemsWritable> output,
                      Reporter reporter) throws IOException {
     if (!values.hasNext()) {
       return;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,21 +20,21 @@ package org.apache.mahout.cf.taste.hadoo
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
 public final class AggregateCombiner extends MapReduceBase implements
-    Reducer<VLongWritable,VectorWritable,VLongWritable,VectorWritable> {
+    Reducer<VarLongWritable,VectorWritable,VarLongWritable,VectorWritable> {
 
   @Override
-  public void reduce(VLongWritable key,
+  public void reduce(VarLongWritable key,
                      Iterator<VectorWritable> values,
-                     OutputCollector<VLongWritable,VectorWritable> output,
+                     OutputCollector<VarLongWritable,VectorWritable> output,
                      Reporter reporter) throws IOException {
     if (values.hasNext()) {
       Vector partial = values.next().get();

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,20 +19,20 @@ package org.apache.mahout.cf.taste.hadoo
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VectorWritable;
 
 public final class CooccurrenceColumnWrapperMapper extends MapReduceBase implements
-    Mapper<IntWritable, VectorWritable,IntWritable,VectorOrPrefWritable> {
+    Mapper<VarIntWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
 
   @Override
-  public void map(IntWritable key,
+  public void map(VarIntWritable key,
                   VectorWritable value,
-                  OutputCollector<IntWritable,VectorOrPrefWritable> output,
+                  OutputCollector<VarIntWritable,VectorOrPrefWritable> output,
                   Reporter reporter) throws IOException {
     output.collect(key, new VectorOrPrefWritable(value.get()));
   }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.mahout.math.Varint;
 
 /** A {@link WritableComparable} encapsulating two item indices. */
 public final class IndexIndexWritable
@@ -54,14 +55,14 @@ public final class IndexIndexWritable
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeInt(aID);
-    out.writeInt(bID);
+    Varint.writeUnsignedVarInt(aID, out);
+    Varint.writeUnsignedVarInt(bID, out);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    aID = in.readInt();
-    bID = in.readInt();
+    aID = Varint.readUnsignedVarInt(in);
+    bID = Varint.readUnsignedVarInt(in);    
   }
 
   @Override

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,33 +20,42 @@ package org.apache.mahout.cf.taste.hadoo
 import java.io.IOException;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
 
 public final class ItemIDIndexMapper extends MapReduceBase implements
-    Mapper<LongWritable,Text,IntWritable,VLongWritable> {
+    Mapper<LongWritable,Text, VarIntWritable, VarLongWritable> {
   
   private static final Pattern COMMA = Pattern.compile(",");
+
+  private boolean transpose;
+
+  @Override
+  public void configure(JobConf jobConf) {
+    transpose = jobConf.getBoolean(ToEntityPrefsMapper.TRANSPOSE_USER_ITEM, false);
+  }
   
   @Override
   public void map(LongWritable key,
                   Text value,
-                  OutputCollector<IntWritable,VLongWritable> output,
+                  OutputCollector<VarIntWritable,VarLongWritable> output,
                   Reporter reporter) throws IOException {
     String[] tokens = ItemIDIndexMapper.COMMA.split(value.toString());
-    long itemID = Long.parseLong(tokens[1]);
+    long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);
     int index = idToIndex(itemID);
-    output.collect(new IntWritable(index), new VLongWritable(itemID));
+    output.collect(new VarIntWritable(index), new VarLongWritable(itemID));
   }
   
   static int idToIndex(long itemID) {
     return 0x7FFFFFFF & ((int) itemID ^ (int) (itemID >>> 32));
   }
   
-}
\ No newline at end of file
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,20 +20,20 @@ package org.apache.mahout.cf.taste.hadoo
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
 
 public final class ItemIDIndexReducer extends MapReduceBase implements
-    Reducer<IntWritable,VLongWritable,IntWritable,VLongWritable> {
+    Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> {
   
   @Override
-  public void reduce(IntWritable index,
-                     Iterator<VLongWritable> possibleItemIDs,
-                     OutputCollector<IntWritable,VLongWritable> output,
+  public void reduce(VarIntWritable index,
+                     Iterator<VarLongWritable> possibleItemIDs,
+                     OutputCollector<VarIntWritable,VarLongWritable> output,
                      Reporter reporter) throws IOException {
     if (possibleItemIDs.hasNext()) {
       long minimumItemID = Long.MAX_VALUE;
@@ -43,7 +43,7 @@ public final class ItemIDIndexReducer ex
           minimumItemID = itemID;
         }
       }
-      output.collect(index, new VLongWritable(minimumItemID));
+      output.collect(index, new VarLongWritable(minimumItemID));
     }
   }
   

Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java (from r945806, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java&r1=945806&r2=947844&rev=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -18,132 +18,61 @@
 package org.apache.mahout.cf.taste.hadoop.item;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.PriorityQueue;
+import java.util.List;
 
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.function.LongFloatProcedure;
-import org.apache.mahout.math.function.LongProcedure;
-import org.apache.mahout.math.map.OpenLongFloatHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class PartialMultiplyReducer extends MapReduceBase implements
-    Reducer<IntWritable,VectorOrPrefWritable,VLongWritable,VectorWritable> {
-
-  private static final Logger log = LoggerFactory.getLogger(PartialMultiplyReducer.class);
-
-  private static final int MAX_PRODUCTS_PER_ITEM = 100;
-
-  private enum Counters {
-    PRODUCTS_OUTPUT,
-    PRODUCTS_SKIPPED,
-  }
+
+public final class PartialMultiplyMapper extends MapReduceBase implements
+    Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,VectorWritable> {
 
   @Override
-  public void reduce(IntWritable key,
-                     Iterator<VectorOrPrefWritable> values,
-                     final OutputCollector<VLongWritable,VectorWritable> output,
-                     final Reporter reporter) {
+  public void map(VarIntWritable key,
+                  VectorAndPrefsWritable vectorAndPrefsWritable,
+                  OutputCollector<VarLongWritable, VectorWritable> output,
+                  Reporter reporter) throws IOException {
 
     int itemIndex = key.get();
-    OpenLongFloatHashMap savedValues = new OpenLongFloatHashMap();
 
-    Vector cooccurrenceColumn = null;
-    while (values.hasNext()) {
-      VectorOrPrefWritable value = values.next();
-      if (value.getVector() == null) {
-        // Then this is a user-pref value
-        savedValues.put(value.getUserID(), value.getValue());
-      } else {
-        // Then this is the column vector
-        if (cooccurrenceColumn != null) {
-          throw new IllegalStateException("Found two co-occurrence columns for item index " + itemIndex);
-        }
-        cooccurrenceColumn = value.getVector();
-      }
-    }
+    Vector cooccurrenceColumn = vectorAndPrefsWritable.getVector();
+    List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
+    List<Float> prefValues = vectorAndPrefsWritable.getValues();
 
-    final VLongWritable userIDWritable = new VLongWritable();
+    VarLongWritable userIDWritable = new VarLongWritable();
 
     // These single-element vectors ensure that each user will not be recommended
     // this item
     Vector excludeVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
     excludeVector.set(itemIndex, Double.NaN);
-    final VectorWritable excludeWritable = new VectorWritable(excludeVector);
+    VectorWritable excludeWritable = new VectorWritable(excludeVector);
     excludeWritable.setWritesLaxPrecision(true);
-    savedValues.forEachKey(new LongProcedure() {
-      @Override
-      public boolean apply(long userID) {
-        userIDWritable.set(userID);
-        try {
-          output.collect(userIDWritable, excludeWritable);
-        } catch (IOException ioe) {
-          throw new IllegalStateException(ioe);
-        }
-        return true;
-      }
-    });
-
-    if (cooccurrenceColumn == null) {
-      log.info("Column vector missing for {}; continuing", itemIndex);
-      return;
-    }    
-
-    final float smallestLargeValue = findSmallestLargeValue(savedValues);
+    for (long userID : userIDs) {
+      userIDWritable.set(userID);
+      output.collect(userIDWritable, excludeWritable);
+    }
 
-    final VectorWritable vectorWritable = new VectorWritable();
+    VectorWritable vectorWritable = new VectorWritable();
     vectorWritable.setWritesLaxPrecision(true);
 
-    final Vector theColumn = cooccurrenceColumn;
-    savedValues.forEachPair(new LongFloatProcedure() {
-      @Override
-      public boolean apply(long userID, float value) {
-        if (Math.abs(value) < smallestLargeValue) {
-          reporter.incrCounter(Counters.PRODUCTS_SKIPPED, 1L);
-        } else {
-          try {
-            Vector partialProduct = value == 1.0f ? theColumn : theColumn.times(value);
-            userIDWritable.set(userID);
-            vectorWritable.set(partialProduct);
-            output.collect(userIDWritable, vectorWritable);
-          } catch (IOException ioe) {
-            throw new IllegalStateException(ioe);
-          }
-          reporter.incrCounter(Counters.PRODUCTS_OUTPUT, 1L);
-        }
-        return true;
+    for (int i = 0; i < userIDs.size(); i++) {
+      long userID = userIDs.get(i);
+      float prefValue = prefValues.get(i);
+      if (!Float.isNaN(prefValue)) {
+        Vector partialProduct = prefValue == 1.0f ? cooccurrenceColumn : 
+            cooccurrenceColumn.times(prefValue);
+        userIDWritable.set(userID);
+        vectorWritable.set(partialProduct);
+        output.collect(userIDWritable, vectorWritable);
       }
-    });
-
-  }
+    }
 
-  private static float findSmallestLargeValue(OpenLongFloatHashMap savedValues) {
-    final PriorityQueue<Float> topPrefValues = new PriorityQueue<Float>(MAX_PRODUCTS_PER_ITEM + 1);
-    savedValues.forEachPair(new LongFloatProcedure() {
-      @Override
-      public boolean apply(long userID, float value) {
-        if (topPrefValues.size() < MAX_PRODUCTS_PER_ITEM) {
-          topPrefValues.add(value);
-        } else {
-          float absValue = Math.abs(value);
-          if (absValue > topPrefValues.peek()) {
-            topPrefValues.add(absValue);
-            topPrefValues.poll();
-          }
-        }
-        return true;
-      }
-    });
-    return topPrefValues.peek();
   }
 
 }
\ No newline at end of file

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -27,23 +27,21 @@ import org.apache.commons.cli2.Option;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.MultipleInputs;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.VectorWritable;
 
 /**
@@ -96,24 +94,24 @@ public final class RecommenderJob extend
     String userVectorPath = tempDirPath + "/userVectors";
     String itemIDIndexPath = tempDirPath + "/itemIDIndex";
     String cooccurrencePath = tempDirPath + "/cooccurrence";
-    String parialMultiplyPath = tempDirPath + "/partialMultiply";
+    String partialMultiplyPath = tempDirPath + "/partialMultiply";
 
     AtomicInteger currentPhase = new AtomicInteger();
 
     JobConf itemIDIndexConf = prepareJobConf(
       inputPath, itemIDIndexPath, TextInputFormat.class,
-      ItemIDIndexMapper.class, IntWritable.class, VLongWritable.class,
-      ItemIDIndexReducer.class, IntWritable.class, VLongWritable.class,
+      ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,
+      ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,
       SequenceFileOutputFormat.class);
-    itemIDIndexConf.setClass("mapred.combiner.class", ItemIDIndexReducer.class, Reducer.class);    
+    itemIDIndexConf.setClass("mapred.combiner.class", ItemIDIndexReducer.class, Reducer.class);
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
       JobClient.runJob(itemIDIndexConf);
     }
     
     JobConf toUserVectorConf = prepareJobConf(
       inputPath, userVectorPath, TextInputFormat.class,
-      ToItemPrefsMapper.class, VLongWritable.class, booleanData ? VLongWritable.class : EntityPrefWritable.class,
-      ToUserVectorReducer.class, VLongWritable.class, VectorWritable.class,
+      ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
+      ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class,
       SequenceFileOutputFormat.class);
     toUserVectorConf.setBoolean(BOOLEAN_DATA, booleanData);
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
@@ -122,20 +120,18 @@ public final class RecommenderJob extend
 
     JobConf toCooccurrenceConf = prepareJobConf(
       userVectorPath, cooccurrencePath, SequenceFileInputFormat.class,
-      UserVectorToCooccurrenceMapper.class, IndexIndexWritable.class, IntWritable.class,
-      UserVectorToCooccurrenceReducer.class, IntWritable.class, VectorWritable.class,
+      UserVectorToCooccurrenceMapper.class, VarIntWritable.class, VarIntWritable.class,
+      UserVectorToCooccurrenceReducer.class, VarIntWritable.class, VectorWritable.class,
       SequenceFileOutputFormat.class);
     setIOSort(toCooccurrenceConf);
-    toCooccurrenceConf.setClass("mapred.combiner.class", CooccurrenceCombiner.class, Reducer.class);
-    toCooccurrenceConf.setClass("mapred.partitioner.class", FirstIndexPartitioner.class, Partitioner.class);
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
       JobClient.runJob(toCooccurrenceConf);
     }
 
     JobConf partialMultiplyConf = prepareJobConf(
-      cooccurrencePath, parialMultiplyPath, SequenceFileInputFormat.class,
-      CooccurrenceColumnWrapperMapper.class, IntWritable.class, VectorOrPrefWritable.class,
-      PartialMultiplyReducer.class, VLongWritable.class, VectorWritable.class,
+      cooccurrencePath, partialMultiplyPath, SequenceFileInputFormat.class,
+      CooccurrenceColumnWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
+      ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
       SequenceFileOutputFormat.class);
     MultipleInputs.addInputPath(
         partialMultiplyConf,
@@ -153,9 +149,9 @@ public final class RecommenderJob extend
     }
 
     JobConf aggregateAndRecommendConf = prepareJobConf(
-        parialMultiplyPath, outputPath, SequenceFileInputFormat.class,
-        IdentityMapper.class, VLongWritable.class, VectorWritable.class,
-        AggregateAndRecommendReducer.class, VLongWritable.class, RecommendedItemsWritable.class,
+        partialMultiplyPath, outputPath, SequenceFileInputFormat.class,
+        PartialMultiplyMapper.class, VarLongWritable.class, VectorWritable.class,
+        AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
         TextOutputFormat.class);
     setIOSort(aggregateAndRecommendConf);
     aggregateAndRecommendConf.setClass("mapred.combiner.class", AggregateCombiner.class, Reducer.class);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,13 +20,13 @@ package org.apache.mahout.cf.taste.hadoo
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
@@ -34,7 +34,7 @@ import org.apache.mahout.math.VectorWrit
  * <h1>Input</h1>
  * 
  * <p>
- * Takes user IDs as {@link VLongWritable} mapped to all associated item IDs and preference values, as
+ * Takes user IDs as {@link VarLongWritable} mapped to all associated item IDs and preference values, as
  * {@link EntityPrefWritable}s.
  * </p>
  * 
@@ -48,19 +48,19 @@ import org.apache.mahout.math.VectorWrit
  * </p>
  */
 public final class ToUserVectorReducer extends MapReduceBase implements
-    Reducer<VLongWritable,VLongWritable,VLongWritable,VectorWritable> {
+    Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
   
   @Override
-  public void reduce(VLongWritable userID,
-                     Iterator<VLongWritable> itemPrefs,
-                     OutputCollector<VLongWritable,VectorWritable> output,
+  public void reduce(VarLongWritable userID,
+                     Iterator<VarLongWritable> itemPrefs,
+                     OutputCollector<VarLongWritable,VectorWritable> output,
                      Reporter reporter) throws IOException {
     if (!itemPrefs.hasNext()) {
       return;
     }
     Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
     while (itemPrefs.hasNext()) {
-      VLongWritable itemPref = itemPrefs.next();
+      VarLongWritable itemPref = itemPrefs.next();
       int index = ItemIDIndexMapper.idToIndex(itemPref.get());
       float value;
       if (itemPref instanceof EntityPrefWritable) {

Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java (from r945806, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java&r1=945806&r2=947844&rev=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -18,62 +18,50 @@
 package org.apache.mahout.cf.taste.hadoop.item;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
-import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
 
-/**
- * <h1>Input</h1>
- * 
- * <p>
- * Takes user IDs as {@link VLongWritable} mapped to all associated item IDs and preference values, as
- * {@link EntityPrefWritable}s.
- * </p>
- * 
- * <h1>Output</h1>
- * 
- * <p>
- * The same user ID mapped to a {@link RandomAccessSparseVector} representation of the same item IDs and
- * preference values. Item IDs are used as vector indexes; they are hashed into ints to work as indexes with
- * {@link ItemIDIndexMapper#idToIndex(long)}. The mapping is remembered for later with a combination of
- * {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.
- * </p>
- */
-public final class ToUserVectorReducer extends MapReduceBase implements
-    Reducer<VLongWritable,VLongWritable,VLongWritable,VectorWritable> {
-  
+public final class ToVectorAndPrefReducer extends MapReduceBase implements
+    Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> {
+
   @Override
-  public void reduce(VLongWritable userID,
-                     Iterator<VLongWritable> itemPrefs,
-                     OutputCollector<VLongWritable,VectorWritable> output,
+  public void reduce(VarIntWritable key,
+                     Iterator<VectorOrPrefWritable> values,
+                     OutputCollector<VarIntWritable,VectorAndPrefsWritable> output,
                      Reporter reporter) throws IOException {
-    if (!itemPrefs.hasNext()) {
-      return;
-    }
-    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
-    while (itemPrefs.hasNext()) {
-      VLongWritable itemPref = itemPrefs.next();
-      int index = ItemIDIndexMapper.idToIndex(itemPref.get());
-      float value;
-      if (itemPref instanceof EntityPrefWritable) {
-        value = ((EntityPrefWritable) itemPref).getPrefValue();
+
+    List<Long> userIDs = new ArrayList<Long>();
+    List<Float> prefValues = new ArrayList<Float>();
+    Vector cooccurrenceColumn = null;
+    while (values.hasNext()) {
+      VectorOrPrefWritable value = values.next();
+      if (value.getVector() == null) {
+        // Then this is a user-pref value
+        userIDs.add(value.getUserID());
+        prefValues.add(value.getValue());
       } else {
-        value = 1.0f;
+        // Then this is the column vector
+        if (cooccurrenceColumn != null) {
+          throw new IllegalStateException("Found two co-occurrence columns for item index " + key.get());
+        }
+        cooccurrenceColumn = value.getVector();
       }
-      userVector.set(index, value);
     }
 
-    VectorWritable vw = new VectorWritable(userVector);
-    vw.setWritesLaxPrecision(true);
-    output.collect(userID, vw);
+    if (cooccurrenceColumn == null) {
+      return;
+    }
+
+    VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(cooccurrenceColumn, userIDs, prefValues);
+    output.collect(key, vectorAndPrefs);
   }
-  
-}
+
+}
\ No newline at end of file

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -21,15 +21,15 @@ import org.apache.hadoop.io.ArrayWritabl
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.GenericWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VIntWritable;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
 
 public final class TupleWritable extends ArrayWritable {
   
   public static class Field extends GenericWritable {
     
-    private static final Class<?>[] CLASSES = {VIntWritable.class, VLongWritable.class, DoubleWritable.class,
+    private static final Class<?>[] CLASSES = {VarIntWritable.class, VarLongWritable.class, DoubleWritable.class,
                                                Text.class};
     
     @Override
@@ -98,8 +98,8 @@ public final class TupleWritable extends
   public int getInt(int idx) {
     Field field = get(idx);
     Class<? extends Writable> wrappedClass = field.get().getClass();
-    if (wrappedClass.equals(VIntWritable.class)) {
-      return ((VIntWritable) field.get()).get();
+    if (wrappedClass.equals(VarIntWritable.class)) {
+      return ((VarIntWritable) field.get()).get();
     } else {
       throw new IllegalArgumentException("Not an integer: " + wrappedClass);
     }
@@ -108,8 +108,8 @@ public final class TupleWritable extends
   public long getLong(int idx) {
     Field field = get(idx);
     Class<? extends Writable> wrappedClass = field.get().getClass();
-    if (wrappedClass.equals(VLongWritable.class)) {
-      return ((VLongWritable) field.get()).get();
+    if (wrappedClass.equals(VarLongWritable.class)) {
+      return ((VarLongWritable) field.get()).get();
     } else {
       throw new IllegalArgumentException("Not a long: " + wrappedClass);
     }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,12 +19,11 @@ package org.apache.mahout.cf.taste.hadoo
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.PriorityQueue;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
@@ -32,12 +31,15 @@ import org.apache.hadoop.mapred.OutputCo
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.cf.taste.impl.common.FastIDSet;
 import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
 public final class UserVectorSplitterMapper extends MapReduceBase implements
-    Mapper<VLongWritable,VectorWritable,IntWritable,VectorOrPrefWritable> {
+    Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
 
+  private static final int MAX_PREFS_CONSIDERED = 10;  
   static final String USERS_FILE = "usersFile";
   
   private FastIDSet usersToRecommendFor;
@@ -63,26 +65,63 @@ public final class UserVectorSplitterMap
   }
 
   @Override
-  public void map(VLongWritable key,
+  public void map(VarLongWritable key,
                   VectorWritable value,
-                  OutputCollector<IntWritable,VectorOrPrefWritable> output,
+                  OutputCollector<VarIntWritable,VectorOrPrefWritable> output,
                   Reporter reporter) throws IOException {
     long userID = key.get();
     if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
       return;
     }
-    Vector userVector = value.get();
+    Vector userVector = maybePruneUserVector(value.get());
     Iterator<Vector.Element> it = userVector.iterateNonZero();
-    IntWritable itemIndexWritable = new IntWritable();
+    VarIntWritable itemIndexWritable = new VarIntWritable();
     VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
     while (it.hasNext()) {
       Vector.Element e = it.next();
-      int itemIndex = e.index();
-      double preferenceValue = e.get();
-      itemIndexWritable.set(itemIndex);
-      vectorOrPref.set(userID, (float) preferenceValue);
+      itemIndexWritable.set(e.index());
+      vectorOrPref.set(userID, (float) e.get());
       output.collect(itemIndexWritable, vectorOrPref);
     }
   }
 
+  private static Vector maybePruneUserVector(Vector userVector) {
+    if (userVector.getNumNondefaultElements() <= MAX_PREFS_CONSIDERED) {
+      return userVector;
+    }
+
+    float smallestLargeValue = findSmallestLargeValue(userVector);
+
+    // "Blank out" small-sized prefs to reduce the amount of partial products
+    // generated later. They're not zeroed, but NaN-ed, so they come through
+    // and can be used to exclude these items from prefs.
+    Iterator<Vector.Element> it = userVector.iterateNonZero();
+    while (it.hasNext()) {
+      Vector.Element e = it.next();
+      float absValue = Math.abs((float) e.get());
+      if (absValue < smallestLargeValue) {
+        e.set(Float.NaN);
+      }
+    }
+
+    return userVector;
+  }
+
+  private static float findSmallestLargeValue(Vector userVector) {
+    PriorityQueue<Float> topPrefValues = new PriorityQueue<Float>(MAX_PREFS_CONSIDERED + 1);
+    Iterator<Vector.Element> it = userVector.iterateNonZero();
+    while (it.hasNext()) {
+      float absValue = Math.abs((float) it.next().get());
+      if (topPrefValues.size() < MAX_PREFS_CONSIDERED) {
+        topPrefValues.add(absValue);
+      } else {
+        if (absValue > topPrefValues.peek()) {
+          topPrefValues.add(absValue);
+          topPrefValues.poll();
+        }
+      }
+    }
+    return topPrefValues.peek();
+  }
+
 }
\ No newline at end of file

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,49 +22,56 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.PriorityQueue;
 
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.map.OpenIntIntHashMap;
 
 public final class UserVectorToCooccurrenceMapper extends MapReduceBase implements
-    Mapper<VLongWritable,VectorWritable,IndexIndexWritable,IntWritable> {
+    Mapper<VarLongWritable,VectorWritable,VarIntWritable,VarIntWritable> {
 
   private static final int MAX_PREFS_CONSIDERED = 100;
 
-  private boolean outputGuardValue = true;
+  private enum Counters {
+    USER_PREFS_SKIPPED,
+  }
+
   private final OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
 
   @Override
-  public void map(VLongWritable userID,
+  public void map(VarLongWritable userID,
                   VectorWritable userVectorWritable,
-                  OutputCollector<IndexIndexWritable,IntWritable> output,
+                  OutputCollector<VarIntWritable,VarIntWritable> output,
                   Reporter reporter) throws IOException {
+
     Vector userVector = userVectorWritable.get();
     countSeen(userVector);
-    userVector = maybePruneUserVector(userVector);    
+
+    int originalSize = userVector.getNumNondefaultElements();
+    userVector = maybePruneUserVector(userVector);
+    int newSize = userVector.getNumNondefaultElements();
+    if (newSize < originalSize) {
+      reporter.incrCounter(Counters.USER_PREFS_SKIPPED, originalSize - newSize);
+    }
+
     Iterator<Vector.Element> it = userVector.iterateNonZero();
-    IndexIndexWritable entityEntity = new IndexIndexWritable();
-    IntWritable one = new IntWritable(1);
+    VarIntWritable itemIndex1 = new VarIntWritable();
+    VarIntWritable itemIndex2 = new VarIntWritable();
     while (it.hasNext()) {
       int index1 = it.next().index();
+      itemIndex1.set(index1);
       Iterator<Vector.Element> it2 = userVector.iterateNonZero();
       while (it2.hasNext()) {
         int index2 = it2.next().index();
-        entityEntity.set(index1, index2);
-        output.collect(entityEntity, one);
+        itemIndex2.set(index2);
+        output.collect(itemIndex1, itemIndex2);
       }
     }
-    // Guard value, output once, sorts after everything; will be dropped by combiner
-    if (outputGuardValue) {
-      output.collect(new IndexIndexWritable(Integer.MAX_VALUE, Integer.MAX_VALUE), one);
-      outputGuardValue = false;
-    }
   }
 
   private Vector maybePruneUserVector(Vector userVector) {

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,64 +20,31 @@ package org.apache.mahout.cf.taste.hadoo
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
 public final class UserVectorToCooccurrenceReducer extends MapReduceBase implements
-    Reducer<IndexIndexWritable,IntWritable,IntWritable,VectorWritable> {
-
-  private int lastItem1ID = Integer.MIN_VALUE;
-  private int lastItem2ID = Integer.MIN_VALUE;
-  private Vector cooccurrenceRow = null;
-  private int count = 0;
+    Reducer<VarIntWritable,VarIntWritable,VarIntWritable,VectorWritable> {
 
   @Override
-  public void reduce(IndexIndexWritable entityEntity,
-                     Iterator<IntWritable> counts,
-                     OutputCollector<IntWritable,VectorWritable> output,
+  public void reduce(VarIntWritable itemIndex1,
+                     Iterator<VarIntWritable> itemIndex2s,
+                     OutputCollector<VarIntWritable,VectorWritable> output,
                      Reporter reporter) throws IOException {
-
-    int item1ID = entityEntity.getAID();
-    int item2ID = entityEntity.getBID();
-    int sum = CooccurrenceCombiner.sum(counts);
-
-    if (item1ID < lastItem1ID) {
-      throw new IllegalStateException();
-    }
-    if (item1ID == lastItem1ID) {
-      if (item2ID < lastItem2ID) {
-        throw new IllegalStateException();
-      }
-      if (item2ID == lastItem2ID) {
-        count += sum;
-      } else {
-        if (cooccurrenceRow == null) {
-          cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
-        }
-        cooccurrenceRow.set(lastItem2ID, count);
-        lastItem2ID = item2ID;
-        count = sum;
-      }
-    } else {
-      if (cooccurrenceRow != null) {
-        if (count > 0) {
-          cooccurrenceRow.set(lastItem2ID, count);
-        }
-        VectorWritable vw = new VectorWritable(cooccurrenceRow);
-        vw.setWritesLaxPrecision(true);
-        output.collect(new IntWritable(lastItem1ID), vw);
-      }
-      lastItem1ID = item1ID;
-      lastItem2ID = item2ID;
-      cooccurrenceRow = null;
-      count = sum;
+    Vector cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    while (itemIndex2s.hasNext()) {
+      int itemIndex2 = itemIndex2s.next().get();
+      cooccurrenceRow.set(itemIndex2, cooccurrenceRow.get(itemIndex2) + 1.0);
     }
+    VectorWritable vw = new VectorWritable(cooccurrenceRow);
+    vw.setWritesLaxPrecision(true);
+    output.collect(itemIndex1, vw);
   }
   
 }
\ No newline at end of file

Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java (from r945806, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java&r1=945806&r2=947844&rev=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java Mon May 24 22:44:51 2010
@@ -1,11 +1,10 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -21,80 +20,65 @@ package org.apache.mahout.cf.taste.hadoo
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.mahout.math.Varint;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
-public final class VectorOrPrefWritable implements Writable {
+public final class VectorAndPrefsWritable implements Writable {
 
   private Vector vector;
-  private long userID;
-  private float value;
+  private List<Long> userIDs;
+  private List<Float> values;
 
-  public VectorOrPrefWritable() {
+  public VectorAndPrefsWritable() {
   }
 
-  public VectorOrPrefWritable(Vector vector) {
+  public VectorAndPrefsWritable(Vector vector, List<Long> userIDs, List<Float> values) {
     this.vector = vector;
-  }
-
-  public VectorOrPrefWritable(long userID, float value) {
-    this.userID = userID;
-    this.value = value;
+    this.userIDs = userIDs;
+    this.values = values;
   }
 
   public Vector getVector() {
     return vector;
   }
 
-  public long getUserID() {
-    return userID;
-  }
-
-  public float getValue() {
-    return value;
-  }
-
-  public void set(Vector vector) {
-    this.vector = vector;
-    this.userID = Long.MIN_VALUE;
-    this.value = Float.NaN;
+  public List<Long> getUserIDs() {
+    return userIDs;
   }
 
-  public void set(long userID, float value) {
-    this.vector = null;
-    this.userID = userID;
-    this.value = value;
+  public List<Float> getValues() {
+    return values;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    if (vector == null) {
-      out.writeBoolean(false);
-      WritableUtils.writeVLong(out, userID);
-      out.writeFloat(value);
-    } else {
-      out.writeBoolean(true);
-      VectorWritable vw = new VectorWritable(vector);
-      vw.setWritesLaxPrecision(true);
-      vw.write(out);
+    VectorWritable vw = new VectorWritable(vector);
+    vw.setWritesLaxPrecision(true);
+    vw.write(out);
+    Varint.writeUnsignedVarInt(userIDs.size(), out);
+    for (int i = 0; i < userIDs.size(); i++) {
+      Varint.writeSignedVarLong(userIDs.get(i), out);
+      out.writeFloat(values.get(i));
     }
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    boolean hasVector = in.readBoolean();
-    if (hasVector) {
-      VectorWritable writable = new VectorWritable();
-      writable.readFields(in);
-      set(writable.get());
-    } else {
-      long theUserID = WritableUtils.readVLong(in);
-      float theValue = in.readFloat();
-      set(theUserID, theValue);
+    VectorWritable writable = new VectorWritable();
+    writable.readFields(in);
+    vector = writable.get();
+    int size = Varint.readUnsignedVarInt(in);
+    userIDs = new ArrayList<Long>(size);
+    values = new ArrayList<Float>(size);
+    for (int i = 0; i < size; i++) {
+      userIDs.add(Varint.readSignedVarLong(in));
+      values.add(in.readFloat());
     }
   }
 
-}
+}
\ No newline at end of file

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java Mon May 24 22:44:51 2010
@@ -23,7 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.mahout.math.Varint;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
@@ -73,7 +73,7 @@ public final class VectorOrPrefWritable 
   public void write(DataOutput out) throws IOException {
     if (vector == null) {
       out.writeBoolean(false);
-      WritableUtils.writeVLong(out, userID);
+      Varint.writeSignedVarLong(userID, out);
       out.writeFloat(value);
     } else {
       out.writeBoolean(true);
@@ -91,7 +91,7 @@ public final class VectorOrPrefWritable 
       writable.readFields(in);
       set(writable.get());
     } else {
-      long theUserID = WritableUtils.readVLong(in);
+      long theUserID = Varint.readSignedVarLong(in);
       float theValue = in.readFloat();
       set(theUserID, theValue);
     }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -23,7 +23,6 @@ import java.util.Map;
 import org.apache.commons.cli2.Option;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.JobClient;
@@ -34,6 +33,7 @@ import org.apache.hadoop.util.ToolRunner
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
 import org.apache.mahout.cf.taste.recommender.Recommender;
+import org.apache.mahout.math.VarLongWritable;
 
 /**
  * <p>
@@ -130,8 +130,8 @@ public final class RecommenderJob extend
     int recommendationsPerUser = Integer.parseInt(parsedArgs.get("--numRecommendations"));
     
     JobConf jobConf = prepareJobConf(usersFile, outputPath, TextInputFormat.class,
-      UserIDsMapper.class, VLongWritable.class, NullWritable.class, RecommenderReducer.class,
-      VLongWritable.class, RecommendedItemsWritable.class, TextOutputFormat.class);
+      UserIDsMapper.class, VarLongWritable.class, NullWritable.class, RecommenderReducer.class,
+      VarLongWritable.class, RecommendedItemsWritable.class, TextOutputFormat.class);
     
     jobConf.set(RecommenderReducer.RECOMMENDER_CLASS_NAME, recommendClassName);
     jobConf.setInt(RecommenderReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -27,7 +27,6 @@ import java.util.List;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -39,6 +38,7 @@ import org.apache.mahout.cf.taste.impl.m
 import org.apache.mahout.cf.taste.model.DataModel;
 import org.apache.mahout.cf.taste.recommender.RecommendedItem;
 import org.apache.mahout.cf.taste.recommender.Recommender;
+import org.apache.mahout.math.VarLongWritable;
 
 /**
  * <p>
@@ -50,7 +50,7 @@ import org.apache.mahout.cf.taste.recomm
  * @see RecommenderJob
  */
 public final class RecommenderReducer extends MapReduceBase implements
-    Reducer<VLongWritable,NullWritable,VLongWritable,RecommendedItemsWritable> {
+    Reducer<VarLongWritable,NullWritable,VarLongWritable,RecommendedItemsWritable> {
   
   static final String RECOMMENDER_CLASS_NAME = "recommenderClassName";
   static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
@@ -94,9 +94,9 @@ public final class RecommenderReducer ex
   }
   
   @Override
-  public void reduce(VLongWritable key,
+  public void reduce(VarLongWritable key,
                      Iterator<NullWritable> values,
-                     OutputCollector<VLongWritable,RecommendedItemsWritable> output,
+                     OutputCollector<VarLongWritable,RecommendedItemsWritable> output,
                      Reporter reporter) throws IOException {
     long userID = key.get();
     List<RecommendedItem> recommendedItems;

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,27 +22,27 @@ import java.io.IOException;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.VarLongWritable;
 
 /**
  * Extracts and emits all user IDs from the users file, or input file.
  */
 public final class UserIDsMapper extends MapReduceBase implements
-    Mapper<LongWritable,Text,VLongWritable,NullWritable> {
+    Mapper<LongWritable,Text, VarLongWritable,NullWritable> {
   
   @Override
   public void map(LongWritable key,
                   Text value,
-                  OutputCollector<VLongWritable,NullWritable> output,
+                  OutputCollector<VarLongWritable,NullWritable> output,
                   Reporter reporter) throws IOException {
     String line = value.toString();
     int comma = line.indexOf(',');
     long userID = comma >= 0 ? Long.parseLong(line.substring(0, comma)) : Long.parseLong(line);
-    output.collect(new VLongWritable(userID), NullWritable.get());
+    output.collect(new VarLongWritable(userID), NullWritable.get());
   }
   
 }
\ No newline at end of file

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,22 +19,22 @@ package org.apache.mahout.cf.taste.hadoo
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.cf.taste.hadoop.similarity.CoRating;
+import org.apache.mahout.math.VarLongWritable;
 
 /**
  * map out each pair of items that appears in the same user-vector together with the multiplied vector lengths
  * of the associated item vectors
  */
 public final class CopreferredItemsMapper extends MapReduceBase
-    implements Mapper<VLongWritable,ItemPrefWithItemVectorWeightArrayWritable,ItemPairWritable,CoRating> {
+    implements Mapper<VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable,ItemPairWritable,CoRating> {
 
   @Override
-  public void map(VLongWritable user,
+  public void map(VarLongWritable user,
                   ItemPrefWithItemVectorWeightArrayWritable itemPrefsArray,
                   OutputCollector<ItemPairWritable, CoRating> output,
                   Reporter reporter)

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,13 +22,13 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Varint;
 
 /**
  * a writable key that is used by {@link CountUsersMapper} and {@link CountUsersReducer} to
@@ -52,12 +52,12 @@ public class CountUsersKeyWritable imple
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    userID = WritableUtils.readVLong(in);
+    userID = Varint.readSignedVarLong(in);
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    WritableUtils.writeVLong(out, userID);
+    Varint.writeSignedVarLong(userID, out);
   }
 
   @Override
@@ -81,10 +81,10 @@ public class CountUsersKeyWritable imple
   /**
    * all userIDs go to the same partition
    */
-  public static class CountUsersPartitioner implements Partitioner<CountUsersKeyWritable,VLongWritable> {
+  public static class CountUsersPartitioner implements Partitioner<CountUsersKeyWritable,VarLongWritable> {
 
     @Override
-    public int getPartition(CountUsersKeyWritable key, VLongWritable value, int numPartitions) {
+    public int getPartition(CountUsersKeyWritable key, VarLongWritable value, int numPartitions) {
       return 0;
     }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java?rev=947844&r1=947843&r2=947844&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java Mon May 24 22:44:51 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,29 +22,29 @@ import java.util.regex.Pattern;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.VarLongWritable;
 
 /**
  * maps out the userIDs in a way that we can use a secondary sort on them
  */
 public class CountUsersMapper extends MapReduceBase
-    implements Mapper<LongWritable,Text,CountUsersKeyWritable,VLongWritable> {
+    implements Mapper<LongWritable,Text,CountUsersKeyWritable, VarLongWritable> {
 
   private static final Pattern DELIMITER = Pattern.compile("[\t,]");
 
   @Override
   public void map(LongWritable arg0, Text value,
-      OutputCollector<CountUsersKeyWritable,VLongWritable> out, Reporter reporter)
+      OutputCollector<CountUsersKeyWritable,VarLongWritable> out, Reporter reporter)
       throws IOException {
 
     String[] tokens = DELIMITER.split(value.toString());
     long userID = Long.parseLong(tokens[0]);
 
-    out.collect(new CountUsersKeyWritable(userID), new VLongWritable(userID));
+    out.collect(new CountUsersKeyWritable(userID), new VarLongWritable(userID));
   }
 
 }