You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/04/28 06:37:20 UTC

svn commit: r938782 - in /lucene/mahout/trunk/core/src/main/java/org/apache/mahout: cf/taste/hadoop/ cf/taste/hadoop/cooccurence/ cf/taste/hadoop/item/ common/

Author: srowen
Date: Wed Apr 28 04:37:19 2010
New Revision: 938782

URL: http://svn.apache.org/viewvc?rev=938782&view=rev
Log:
(More of MAHOUT-385 and) Another round of refinements to item-based recommender; per MAHOUT-305 combine the two implementations

Added:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/FirstIndexPartitioner.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java
      - copied, changed from r937991, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java
      - copied, changed from r937991, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/TupleWritable.java
Removed:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MapFilesMap.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/
Modified:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Wed Apr 28 04:37:19 2010
@@ -25,40 +25,60 @@ import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.cf.taste.hadoop.MapFilesMap;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
 import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
 import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
 import org.apache.mahout.cf.taste.recommender.RecommendedItem;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
 
 public final class AggregateAndRecommendReducer extends MapReduceBase implements
-    Reducer<LongWritable,RandomAccessSparseVectorWritable,LongWritable,RecommendedItemsWritable> {
+    Reducer<LongWritable,VectorWritable,LongWritable,RecommendedItemsWritable> {
 
   static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
   static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
 
+  private static final PathFilter PARTS_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith("part-");
+    }
+  };
+
   private int recommendationsPerUser;
-  private MapFilesMap<IntWritable,LongWritable> indexItemIDMap;
+  private OpenIntLongHashMap indexItemIDMap;
 
   @Override
   public void configure(JobConf jobConf) {
+    recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
     try {
       FileSystem fs = FileSystem.get(jobConf);
       Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
-      recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
-      indexItemIDMap = new MapFilesMap<IntWritable,LongWritable>(fs, itemIDIndexPath, new Configuration());
+      indexItemIDMap = new OpenIntLongHashMap();
+      IntWritable index = new IntWritable();
+      LongWritable id = new LongWritable();
+      for (FileStatus status : fs.listStatus(itemIDIndexPath, PARTS_FILTER)) {
+        String path = status.getPath().toString();
+        SequenceFile.Reader reader =
+            new SequenceFile.Reader(fs, new Path(path).makeQualified(fs), jobConf);
+        while (reader.next(index, id)) {
+          indexItemIDMap.put(index.get(), id.get());
+        }
+        reader.close();
+      }
     } catch (IOException ioe) {
       throw new IllegalStateException(ioe);
     }
@@ -66,7 +86,7 @@ public final class AggregateAndRecommend
 
   @Override
   public void reduce(LongWritable key,
-                     Iterator<RandomAccessSparseVectorWritable> values,
+                     Iterator<VectorWritable> values,
                      OutputCollector<LongWritable, RecommendedItemsWritable> output,
                      Reporter reporter) throws IOException {
     if (!values.hasNext()) {
@@ -80,22 +100,18 @@ public final class AggregateAndRecommend
     Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1,
     Collections.reverseOrder(ByValueRecommendedItemComparator.getInstance()));
 
-    Iterator<Vector.Element> recommendationVectorIterator = recommendationVector.iterateNonZero();
-    LongWritable itemID = new LongWritable();
+    Iterator<Vector.Element> recommendationVectorIterator =
+        recommendationVector.iterateNonZero();
     while (recommendationVectorIterator.hasNext()) {
       Vector.Element element = recommendationVectorIterator.next();
       int index = element.index();
       if (topItems.size() < recommendationsPerUser) {
-        LongWritable theItemID = indexItemIDMap.get(new IntWritable(index), itemID);
-        if (theItemID != null) {
-          topItems.add(new GenericRecommendedItem(theItemID.get(), (float) element.get()));
-        } // else, huh?
+        long theItemID = indexItemIDMap.get(index);
+        topItems.add(new GenericRecommendedItem(theItemID, (float) element.get()));
       } else if (element.get() > topItems.peek().getValue()) {
-        LongWritable theItemID = indexItemIDMap.get(new IntWritable(index), itemID);
-        if (theItemID != null) {
-          topItems.add(new GenericRecommendedItem(theItemID.get(), (float) element.get()));
-          topItems.poll();
-        } // else, huh?
+        long theItemID = indexItemIDMap.get(index);
+        topItems.add(new GenericRecommendedItem(theItemID, (float) element.get()));
+        topItems.poll();
       }
     }
 

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java Wed Apr 28 04:37:19 2010
@@ -25,17 +25,16 @@ import org.apache.hadoop.mapred.MapReduc
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
 import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
 
 public final class AggregateCombiner extends MapReduceBase implements
-    Reducer<LongWritable,RandomAccessSparseVectorWritable,LongWritable,RandomAccessSparseVectorWritable> {
+    Reducer<LongWritable,VectorWritable,LongWritable,VectorWritable> {
 
   @Override
   public void reduce(LongWritable key,
-                     Iterator<RandomAccessSparseVectorWritable> values,
-                     OutputCollector<LongWritable, RandomAccessSparseVectorWritable> output,
+                     Iterator<VectorWritable> values,
+                     OutputCollector<LongWritable, VectorWritable> output,
                      Reporter reporter) throws IOException {
     if (!values.hasNext()) {
       return;
@@ -44,7 +43,7 @@ public final class AggregateCombiner ext
     while (values.hasNext()) {
       partial = partial.plus(values.next().get());
     }
-    output.collect(key, new RandomAccessSparseVectorWritable((RandomAccessSparseVector) partial));
+    output.collect(key, new VectorWritable(partial));
   }
 
 }
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java Wed Apr 28 04:37:19 2010
@@ -24,18 +24,17 @@ import org.apache.hadoop.mapred.MapReduc
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.VectorWritable;
 
 public final class CooccurrenceColumnWrapperMapper extends MapReduceBase implements
-    Mapper<IntWritable, RandomAccessSparseVectorWritable,IntWritable,VectorOrPrefWritable> {
+    Mapper<IntWritable, VectorWritable,IntWritable,VectorOrPrefWritable> {
 
   @Override
   public void map(IntWritable key,
-                  RandomAccessSparseVectorWritable value,
+                  VectorWritable value,
                   OutputCollector<IntWritable,VectorOrPrefWritable> output,
                   Reporter reporter) throws IOException {
-    output.collect(key, new VectorOrPrefWritable((RandomAccessSparseVector) value.get()));
+    output.collect(key, new VectorOrPrefWritable(value.get()));
   }
 
 }
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java Wed Apr 28 04:37:19 2010
@@ -25,39 +25,36 @@ import org.apache.hadoop.mapred.MapReduc
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
-import org.apache.mahout.math.function.IntIntProcedure;
-import org.apache.mahout.math.map.OpenIntIntHashMap;
 
 public final class CooccurrenceCombiner extends MapReduceBase implements
-    Reducer<IntWritable,EntityCountWritable,IntWritable,EntityCountWritable> {
+    Reducer<IndexIndexWritable,IntWritable,IndexIndexWritable,IntWritable> {
 
-  @Override
-  public void reduce(final IntWritable index1,
-                     Iterator<EntityCountWritable> index2s,
-                     final OutputCollector<IntWritable,EntityCountWritable> output,
-                     Reporter reporter) {
+  private IndexIndexWritable lastEntityEntity =
+      new IndexIndexWritable(Integer.MIN_VALUE, Integer.MIN_VALUE);
+  private int count = 0;
 
-    OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
-    while (index2s.hasNext()) {
-      EntityCountWritable writable = index2s.next();
-      int index = (int) writable.getID();
-      int oldValue = indexCounts.get(index);
-      indexCounts.put(index, oldValue + writable.getCount());
+  @Override
+  public void reduce(IndexIndexWritable entityEntity,
+                     Iterator<IntWritable> counts,
+                     OutputCollector<IndexIndexWritable,IntWritable> output,
+                     Reporter reporter) throws IOException {
+    if (entityEntity.equals(lastEntityEntity)) {
+      count += sum(counts);
+    } else {
+      if (count > 0) {
+        output.collect(lastEntityEntity, new IntWritable(count));
+      }
+      count = sum(counts);     
+      lastEntityEntity = entityEntity.clone();
     }
+  }
 
-    final EntityCountWritable entityCount = new EntityCountWritable();
-    indexCounts.forEachPair(new IntIntProcedure() {
-      @Override
-      public boolean apply(int index, int count) {
-        entityCount.set(index, count);
-        try {
-          output.collect(index1, entityCount);
-        } catch (IOException ioe) {
-          throw new IllegalStateException(ioe);
-        }
-        return true;
-      }
-    });
+  static int sum(Iterator<IntWritable> it) {
+    int sum = 0;
+    while (it.hasNext()) {
+      sum += it.next().get();
+    }
+    return sum;
   }
+
 }
\ No newline at end of file

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/FirstIndexPartitioner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/FirstIndexPartitioner.java?rev=938782&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/FirstIndexPartitioner.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/FirstIndexPartitioner.java Wed Apr 28 04:37:19 2010
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+public final class FirstIndexPartitioner<V> implements Partitioner<IndexIndexWritable,V> {
+
+  @Override
+  public void configure(JobConf job) {
+    // do nothing
+  }
+
+  @Override
+  public int getPartition(IndexIndexWritable key, V value, int numPartitions) {
+    return key.getAID() % numPartitions;
+  }
+
+}

Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java (from r937991, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java&r1=937991&r2=938782&rev=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java Wed Apr 28 04:37:19 2010
@@ -15,88 +15,87 @@
  * limitations under the License.
  */
 
-package org.apache.mahout.cf.taste.hadoop;
+package org.apache.mahout.cf.taste.hadoop.item;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.mahout.common.RandomUtils;
 
-/** A {@link WritableComparable} encapsulating two items. */
-public final class EntityEntityWritable
-    implements WritableComparable<EntityEntityWritable>, Cloneable {
-  
-  private long aID;
-  private long bID;
-  
-  public EntityEntityWritable() {
+/** A {@link WritableComparable} encapsulating two item indices. */
+public final class IndexIndexWritable
+    implements WritableComparable<IndexIndexWritable>, Cloneable {
+
+  private int aID;
+  private int bID;
+
+  public IndexIndexWritable() {
   // do nothing
   }
-  
-  public EntityEntityWritable(long aID, long bID) {
+
+  public IndexIndexWritable(int aID, int bID) {
     this.aID = aID;
     this.bID = bID;
   }
-  
-  public long getAID() {
+
+  public int getAID() {
     return aID;
   }
-  
-  public long getBID() {
+
+  public int getBID() {
     return bID;
   }
 
-  public void set(long aID, long bID) {
+  public void set(int aID, int bID) {
     this.aID = aID;
     this.bID = bID;
   }
-  
+
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeLong(aID);
-    out.writeLong(bID);
+    out.writeInt(aID);
+    out.writeInt(bID);
   }
-  
+
   @Override
   public void readFields(DataInput in) throws IOException {
-    aID = in.readLong();
-    bID = in.readLong();
+    aID = in.readInt();
+    bID = in.readInt();
   }
-  
+
   @Override
-  public int compareTo(EntityEntityWritable that) {
+  public int compareTo(IndexIndexWritable that) {
     int aCompare = compare(aID, that.getAID());
     return aCompare == 0 ? compare(bID, that.getBID()) : aCompare;
   }
-  
-  private static int compare(long a, long b) {
+
+  private static int compare(int a, int b) {
     return a < b ? -1 : a > b ? 1 : 0;
   }
-  
+
   @Override
   public int hashCode() {
-    return RandomUtils.hashLong(aID) + 31 * RandomUtils.hashLong(bID);
+    return aID + 31 * bID;
   }
-  
+
   @Override
   public boolean equals(Object o) {
-    if (o instanceof EntityEntityWritable) {
-      EntityEntityWritable that = (EntityEntityWritable) o;
+    if (o instanceof IndexIndexWritable) {
+      IndexIndexWritable that = (IndexIndexWritable) o;
       return (aID == that.getAID()) && (bID == that.getBID());
     }
     return false;
   }
-  
+
   @Override
   public String toString() {
     return aID + "\t" + bID;
   }
 
   @Override
-  public EntityEntityWritable clone() {
-    return new EntityEntityWritable(aID, bID);
+  public IndexIndexWritable clone() {
+    return new IndexIndexWritable(aID, bID);
   }
-  
+
 }
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java Wed Apr 28 04:37:19 2010
@@ -26,30 +26,35 @@ import org.apache.hadoop.mapred.MapReduc
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.function.LongFloatProcedure;
 import org.apache.mahout.math.map.OpenLongFloatHashMap;
 
 public final class PartialMultiplyReducer extends MapReduceBase implements
-    Reducer<IntWritable,VectorOrPrefWritable,LongWritable, RandomAccessSparseVectorWritable> {
+    Reducer<IntWritable,VectorOrPrefWritable,LongWritable,VectorWritable> {
 
   @Override
   public void reduce(IntWritable key,
                      Iterator<VectorOrPrefWritable> values,
-                     final OutputCollector<LongWritable,RandomAccessSparseVectorWritable> output,
+                     final OutputCollector<LongWritable,VectorWritable> output,
                      Reporter reporter) throws IOException {
+
     OpenLongFloatHashMap savedValues = new OpenLongFloatHashMap();
     Vector cooccurrenceColumn = null;
     final int itemIndex = key.get();
     final LongWritable userIDWritable = new LongWritable();
-    final RandomAccessSparseVectorWritable vectorWritable = new RandomAccessSparseVectorWritable();
+    final VectorWritable vectorWritable = new VectorWritable();
+
     while (values.hasNext()) {
+
       VectorOrPrefWritable value = values.next();
       if (value.getVector() == null) {
+
         // Then this is a user-pref value
         long userID = value.getUserID();
         float preferenceValue = value.getValue();
+        
         if (cooccurrenceColumn == null) {
           // Haven't seen the co-occurrencce column yet; save it
           savedValues.put(userID, preferenceValue);
@@ -62,8 +67,12 @@ public final class PartialMultiplyReduce
           vectorWritable.set(partialProduct);
           output.collect(userIDWritable, vectorWritable);
         }
+
       } else {
+
+        // Then this is the column vector
         cooccurrenceColumn = value.getVector();
+
         final Vector theColumn = cooccurrenceColumn;
         savedValues.forEachPair(new LongFloatProcedure() {
           @Override

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Wed Apr 28 04:37:19 2010
@@ -19,6 +19,9 @@ package org.apache.mahout.cf.taste.hadoo
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.cli2.Option;
 import org.apache.hadoop.conf.Configuration;
@@ -26,11 +29,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -39,12 +40,11 @@ import org.apache.hadoop.mapred.TextOutp
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.MultipleInputs;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.VectorWritable;
 
 /**
  * <p>Runs a completely distributed recommender job as a series of mapreduces.</p>
@@ -98,34 +98,44 @@ public final class RecommenderJob extend
     String cooccurrencePath = tempDirPath + "/cooccurrence";
     String parialMultiplyPath = tempDirPath + "/partialMultiply";
 
+    AtomicInteger currentPhase = new AtomicInteger();
+
     JobConf itemIDIndexConf = prepareJobConf(
       inputPath, itemIDIndexPath, TextInputFormat.class,
       ItemIDIndexMapper.class, IntWritable.class, LongWritable.class,
       ItemIDIndexReducer.class, IntWritable.class, LongWritable.class,
-      MapFileOutputFormat.class);
-    JobClient.runJob(itemIDIndexConf);
+      SequenceFileOutputFormat.class);
+    itemIDIndexConf.setClass("mapred.combiner.class", ItemIDIndexReducer.class, Reducer.class);    
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      JobClient.runJob(itemIDIndexConf);
+    }
     
     JobConf toUserVectorConf = prepareJobConf(
       inputPath, userVectorPath, TextInputFormat.class,
       ToItemPrefsMapper.class, LongWritable.class, booleanData ? LongWritable.class : EntityPrefWritable.class,
-      ToUserVectorReducer.class, LongWritable.class, RandomAccessSparseVectorWritable.class,
+      ToUserVectorReducer.class, LongWritable.class, VectorWritable.class,
       SequenceFileOutputFormat.class);
     toUserVectorConf.setBoolean(BOOLEAN_DATA, booleanData);
-    JobClient.runJob(toUserVectorConf);
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      JobClient.runJob(toUserVectorConf);
+    }
 
     JobConf toCooccurrenceConf = prepareJobConf(
       userVectorPath, cooccurrencePath, SequenceFileInputFormat.class,
-      UserVectorToCooccurrenceMapper.class, IntWritable.class, EntityCountWritable.class,
-      UserVectorToCooccurrenceReducer.class, IntWritable.class, RandomAccessSparseVectorWritable.class,
+      UserVectorToCooccurrenceMapper.class, IndexIndexWritable.class, IntWritable.class,
+      UserVectorToCooccurrenceReducer.class, IntWritable.class, VectorWritable.class,
       SequenceFileOutputFormat.class);
-    toCooccurrenceConf.setInt("io.sort.mb", 600);
+    setIOSort(toCooccurrenceConf);
     toCooccurrenceConf.setClass("mapred.combiner.class", CooccurrenceCombiner.class, Reducer.class);
-    JobClient.runJob(toCooccurrenceConf);
+    toCooccurrenceConf.setClass("mapred.partitioner.class", FirstIndexPartitioner.class, Partitioner.class);
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      JobClient.runJob(toCooccurrenceConf);
+    }
 
     JobConf partialMultiplyConf = prepareJobConf(
       cooccurrencePath, parialMultiplyPath, SequenceFileInputFormat.class,
       CooccurrenceColumnWrapperMapper.class, IntWritable.class, VectorOrPrefWritable.class,
-      PartialMultiplyReducer.class, LongWritable.class, RandomAccessSparseVectorWritable.class,
+      PartialMultiplyReducer.class, LongWritable.class, VectorWritable.class,
       SequenceFileOutputFormat.class);
     MultipleInputs.addInputPath(
         partialMultiplyConf,
@@ -138,22 +148,38 @@ public final class RecommenderJob extend
     if (usersFile != null) {
       partialMultiplyConf.set(UserVectorSplitterMapper.USERS_FILE, usersFile);
     }
-    JobClient.runJob(partialMultiplyConf);
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      JobClient.runJob(partialMultiplyConf);
+    }
 
     JobConf aggregateAndRecommendConf = prepareJobConf(
         parialMultiplyPath, outputPath, SequenceFileInputFormat.class,
-        IdentityMapper.class, LongWritable.class, RandomAccessSparseVectorWritable.class,
+        IdentityMapper.class, LongWritable.class, VectorWritable.class,
         AggregateAndRecommendReducer.class, LongWritable.class, RecommendedItemsWritable.class,
         TextOutputFormat.class);
-    aggregateAndRecommendConf.setInt("io.sort.mb", 600);
+    setIOSort(aggregateAndRecommendConf);
     aggregateAndRecommendConf.setClass("mapred.combiner.class", AggregateCombiner.class, Reducer.class);
     aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath);
     aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
-    aggregateAndRecommendConf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
-    JobClient.runJob(aggregateAndRecommendConf);
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      JobClient.runJob(aggregateAndRecommendConf);
+    }
 
     return 0;
   }
+
+  private static void setIOSort(JobConf conf) {
+    conf.setInt("io.sort.factor", 100);
+    conf.setInt("io.sort.mb", 1000);
+    String javaOpts = conf.get("mapred.child.java.opts");
+    if (javaOpts != null) {
+      Matcher m = Pattern.compile("Xmx([0-9]+)m").matcher(javaOpts);
+      if (m.matches()) {
+        int heapMB = Integer.parseInt(m.group(1));
+        conf.setInt("io.sort.mb", heapMB / 2);
+      }
+    }
+  }
   
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new Configuration(), new RecommenderJob(), args);

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java Wed Apr 28 04:37:19 2010
@@ -19,19 +19,16 @@ package org.apache.mahout.cf.taste.hadoo
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.PriorityQueue;
-import java.util.Queue;
 
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
 import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
 
 /**
  * <h1>Input</h1>
@@ -49,34 +46,19 @@ import org.apache.mahout.math.Vector;
  * {@link ItemIDIndexMapper#idToIndex(long)}. The mapping is remembered for later with a combination of
  * {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.
  * </p>
- * 
- * <p>
- * The number of non-default elements actually retained in the user vector is capped at
- * {@link #MAX_PREFS_CONSIDERED}.
- * </p>
- * 
  */
 public final class ToUserVectorReducer extends MapReduceBase implements
-    Reducer<LongWritable,LongWritable,LongWritable, RandomAccessSparseVectorWritable> {
-  
-  public static final int MAX_PREFS_CONSIDERED = 20;
-  
-  private boolean booleanData;
-
-  @Override
-  public void configure(JobConf jobConf) {
-    booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
-  }
+    Reducer<LongWritable,LongWritable,LongWritable, VectorWritable> {
   
   @Override
   public void reduce(LongWritable userID,
                      Iterator<LongWritable> itemPrefs,
-                     OutputCollector<LongWritable,RandomAccessSparseVectorWritable> output,
+                     OutputCollector<LongWritable,VectorWritable> output,
                      Reporter reporter) throws IOException {
     if (!itemPrefs.hasNext()) {
       return;
     }
-    RandomAccessSparseVector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
     while (itemPrefs.hasNext()) {
       LongWritable itemPref = itemPrefs.next();
       int index = ItemIDIndexMapper.idToIndex(itemPref.get());
@@ -89,38 +71,7 @@ public final class ToUserVectorReducer e
       userVector.set(index, value);
     }
 
-    if (!booleanData && userVector.getNumNondefaultElements() > MAX_PREFS_CONSIDERED) {
-      double cutoff = findTopNPrefsCutoff(MAX_PREFS_CONSIDERED,
-        userVector);
-      RandomAccessSparseVector filteredVector = new RandomAccessSparseVector(Integer.MAX_VALUE,
-          MAX_PREFS_CONSIDERED);
-      Iterator<Vector.Element> it = userVector.iterateNonZero();
-      while (it.hasNext()) {
-        Vector.Element element = it.next();
-        if (element.get() >= cutoff) {
-          filteredVector.set(element.index(), element.get());
-        }
-      }
-      userVector = filteredVector;
-    }
-
-    RandomAccessSparseVectorWritable writable = new RandomAccessSparseVectorWritable(userVector);
-    output.collect(userID, writable);
-  }
-
-  private static double findTopNPrefsCutoff(int n, Vector userVector) {
-    Queue<Double> topPrefValues = new PriorityQueue<Double>(n + 1);
-    Iterator<Vector.Element> it = userVector.iterateNonZero();
-    while (it.hasNext()) {
-      double prefValue = it.next().get();
-      if (topPrefValues.size() < n) {
-        topPrefValues.add(prefValue);
-      } else if (prefValue > topPrefValues.peek()) {
-        topPrefValues.add(prefValue);
-        topPrefValues.poll();
-      }
-    }
-    return topPrefValues.peek();
+    output.collect(userID, new VectorWritable(userVector));
   }
   
 }

Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java (from r937991, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/TupleWritable.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/TupleWritable.java&r1=937991&r2=938782&rev=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/cooccurence/TupleWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/TupleWritable.java Wed Apr 28 04:37:19 2010
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.mahout.cf.taste.hadoop.cooccurence;
+package org.apache.mahout.cf.taste.hadoop.item;
 
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.DoubleWritable;

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java Wed Apr 28 04:37:19 2010
@@ -32,11 +32,11 @@ import org.apache.hadoop.mapred.OutputCo
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.cf.taste.impl.common.FastIDSet;
 import org.apache.mahout.common.FileLineIterable;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
 import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
 
 public final class UserVectorSplitterMapper extends MapReduceBase implements
-    Mapper<LongWritable,RandomAccessSparseVectorWritable,IntWritable,VectorOrPrefWritable> {
+    Mapper<LongWritable, VectorWritable,IntWritable,VectorOrPrefWritable> {
 
   static final String USERS_FILE = "usersFile";
   
@@ -64,7 +64,7 @@ public final class UserVectorSplitterMap
 
   @Override
   public void map(LongWritable key,
-                  RandomAccessSparseVectorWritable value,
+                  VectorWritable value,
                   OutputCollector<IntWritable,VectorOrPrefWritable> output,
                   Reporter reporter) throws IOException {
     long userID = key.get();

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java Wed Apr 28 04:37:19 2010
@@ -26,31 +26,86 @@ import org.apache.hadoop.mapred.MapReduc
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
 import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
 
 public final class UserVectorToCooccurrenceMapper extends MapReduceBase implements
-    Mapper<LongWritable, RandomAccessSparseVectorWritable,IntWritable, EntityCountWritable> {
-  
+    Mapper<LongWritable, VectorWritable,IndexIndexWritable,IntWritable> {
+
+  private static final int MAX_PREFS_CONSIDERED = 50;
+
+  private boolean outputGuardValue = true;
+  private final OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
+
   @Override
   public void map(LongWritable userID,
-                  RandomAccessSparseVectorWritable userVector,
-                  OutputCollector<IntWritable,EntityCountWritable> output,
+                  VectorWritable userVectorWritable,
+                  OutputCollector<IndexIndexWritable,IntWritable> output,
                   Reporter reporter) throws IOException {
-    Iterator<Vector.Element> it = userVector.get().iterateNonZero();
-    EntityCountWritable entityCount = new EntityCountWritable();
-    IntWritable writable1 = new IntWritable();
+    Vector userVector = maybePruneUserVector(userVectorWritable.get());
+    countSeen(userVector);
+    Iterator<Vector.Element> it = userVector.iterateNonZero();
+    IndexIndexWritable entityEntity = new IndexIndexWritable();
+    IntWritable one = new IntWritable(1);
     while (it.hasNext()) {
       int index1 = it.next().index();
-      writable1.set(index1);
-      Iterator<Vector.Element> it2 = userVector.get().iterateNonZero();
+      Iterator<Vector.Element> it2 = userVector.iterateNonZero();
       while (it2.hasNext()) {
         int index2 = it2.next().index();
-        entityCount.set(index2, 1);
-        output.collect(writable1, entityCount);
+        if (index1 != index2) {
+          entityEntity.set(index1, index2);
+          output.collect(entityEntity, one);
+        }
       }
     }
+    // Guard value, output once, sorts after everything; will be dropped by combiner
+    if (outputGuardValue) {
+      output.collect(new IndexIndexWritable(Integer.MAX_VALUE, Integer.MAX_VALUE), one);
+      outputGuardValue = false;
+    }
+  }
+
+  private Vector maybePruneUserVector(Vector userVector) {
+    if (userVector.getNumNondefaultElements() <= MAX_PREFS_CONSIDERED) {
+      return userVector;
+    }
+
+    OpenIntIntHashMap countCounts = new OpenIntIntHashMap();
+    Iterator<Vector.Element> it = userVector.iterateNonZero();
+    while (it.hasNext()) {
+      int index = it.next().index();
+      int count = indexCounts.get(index);
+      countCounts.adjustOrPutValue(count, 1, 1);
+    }
+
+    int resultingSizeAtCutoff = 0;
+    int cutoff = 0;
+    while (resultingSizeAtCutoff <= MAX_PREFS_CONSIDERED) {
+      cutoff++;
+      int count = indexCounts.get(cutoff);
+      resultingSizeAtCutoff += count;
+    }
+
+    Iterator<Vector.Element> it2 = userVector.iterateNonZero();
+    while (it2.hasNext()) {
+      Vector.Element e = it2.next();
+      int index = e.index();
+      int count = indexCounts.get(index);
+      if (count > cutoff) {
+        e.set(0.0);
+      }
+    }
+
+    return userVector;
+  }
+
+  private void countSeen(Vector userVector) {
+    Iterator<Vector.Element> it = userVector.iterateNonZero();
+    while (it.hasNext()) {
+      int index = it.next().index();
+      indexCounts.adjustOrPutValue(index, 1, 1);
+    }
   }
   
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java Wed Apr 28 04:37:19 2010
@@ -25,39 +25,53 @@ import org.apache.hadoop.mapred.MapReduc
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
 import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
-import org.apache.mahout.math.function.IntIntProcedure;
-import org.apache.mahout.math.map.OpenIntIntHashMap;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
 
 public final class UserVectorToCooccurrenceReducer extends MapReduceBase implements
-    Reducer<IntWritable, EntityCountWritable,IntWritable, RandomAccessSparseVectorWritable> {
+    Reducer<IndexIndexWritable,IntWritable,IntWritable,VectorWritable> {
+
+  private int lastItem1ID = Integer.MIN_VALUE;
+  private int lastItem2ID = Integer.MIN_VALUE;
+  private Vector cooccurrenceRow = null;
+  private int count = 0;
 
   @Override
-  public void reduce(IntWritable index1,
-                     Iterator<EntityCountWritable> index2s,
-                     OutputCollector<IntWritable,RandomAccessSparseVectorWritable> output,
+  public void reduce(IndexIndexWritable entityEntity,
+                     Iterator<IntWritable> counts,
+                     OutputCollector<IntWritable,VectorWritable> output,
                      Reporter reporter) throws IOException {
 
-    OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
-    while (index2s.hasNext()) {
-      EntityCountWritable writable = index2s.next();
-      int index = (int) writable.getID();
-      int oldValue = indexCounts.get(index);
-      indexCounts.put(index, oldValue + writable.getCount());
-    }
+    int item1ID = entityEntity.getAID();
+    int item2ID = entityEntity.getBID();
 
-    final RandomAccessSparseVector cooccurrenceRow =
-        new RandomAccessSparseVector(Integer.MAX_VALUE, 1000);
-    indexCounts.forEachPair(new IntIntProcedure() {
-      @Override
-      public boolean apply(int index, int count) {
-        cooccurrenceRow.set(index, count);
-        return true;
+    if (item1ID < lastItem1ID) {
+      throw new IllegalStateException();
+    }
+    if (item1ID == lastItem1ID) {
+      if (item2ID < lastItem2ID) {
+        throw new IllegalStateException();
       }
-    });
-    output.collect(index1, new RandomAccessSparseVectorWritable(cooccurrenceRow));
+      if (item2ID == lastItem2ID) {
+        count += CooccurrenceCombiner.sum(counts);
+      } else {
+        if (cooccurrenceRow == null) {
+          cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE);
+        }
+        cooccurrenceRow.set(item2ID, count);
+        lastItem2ID = item2ID;
+        count = CooccurrenceCombiner.sum(counts);
+      }
+    } else {
+      if (cooccurrenceRow != null) {
+        output.collect(new IntWritable(lastItem1ID), new VectorWritable(cooccurrenceRow));
+      }
+      lastItem1ID = item1ID;
+      lastItem2ID = item2ID;
+      cooccurrenceRow = null;
+      count = CooccurrenceCombiner.sum(counts);
+    }
   }
   
 }
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java Wed Apr 28 04:37:19 2010
@@ -23,20 +23,19 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.RandomAccessSparseVectorWritable;
 import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
 
 public final class VectorOrPrefWritable implements Writable {
 
-  private RandomAccessSparseVector vector;
+  private Vector vector;
   private long userID;
   private float value;
 
   public VectorOrPrefWritable() {
   }
 
-  public VectorOrPrefWritable(RandomAccessSparseVector vector) {
+  public VectorOrPrefWritable(Vector vector) {
     this.vector = vector;
   }
 
@@ -57,7 +56,7 @@ public final class VectorOrPrefWritable 
     return value;
   }
 
-  public void set(RandomAccessSparseVector vector) {
+  public void set(Vector vector) {
     this.vector = vector;
     this.userID = Long.MIN_VALUE;
     this.value = Float.NaN;
@@ -77,7 +76,7 @@ public final class VectorOrPrefWritable 
       out.writeFloat(value);
     } else {
       out.writeBoolean(true);
-      new RandomAccessSparseVectorWritable(vector).write(out);
+      new VectorWritable(vector).write(out);
     }
   }
 
@@ -85,9 +84,9 @@ public final class VectorOrPrefWritable 
   public void readFields(DataInput in) throws IOException {
     boolean hasVector = in.readBoolean();
     if (hasVector) {
-      RandomAccessSparseVectorWritable writable = new RandomAccessSparseVectorWritable();
+      VectorWritable writable = new VectorWritable();
       writable.readFields(in);
-      set((RandomAccessSparseVector) writable.get());
+      set(writable.get());
     } else {
       long theUserID = in.readLong();
       float theValue = in.readFloat();

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=938782&r1=938781&r2=938782&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Wed Apr 28 04:37:19 2010
@@ -20,6 +20,7 @@ package org.apache.mahout.common;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.cli2.Argument;
 import org.apache.commons.cli2.CommandLine;
@@ -93,18 +94,25 @@ public abstract class AbstractJob extend
       argBuilder = argBuilder.withDefault(defaultValue);
     }
     Argument arg = argBuilder.create();
-    return new DefaultOptionBuilder().withLongName(name).withRequired(required).withShortName(shortName)
-        .withArgument(arg).withDescription(description).create();
+    DefaultOptionBuilder optBuilder = new DefaultOptionBuilder().withLongName(name).withRequired(required)
+        .withArgument(arg).withDescription(description);
+    if (shortName != null) {
+      optBuilder = optBuilder.withShortName(shortName);
+    }
+    return optBuilder.create();
   }
   
   protected static Map<String,String> parseArguments(String[] args, Option... extraOpts) {
     
-    Option tempDirOpt = buildOption("tempDir", "t", "Intermediate output directory", "temp");
+    Option tempDirOpt = buildOption("tempDir", null, "Intermediate output directory", "temp");
     Option helpOpt = DefaultOptionCreator.helpOption();
+    Option startPhase = buildOption("startPhase", null, "First phase to run", "0");
+    Option endPhase = buildOption("endPhase", null, "Last phase to run", String.valueOf(Integer.MAX_VALUE));
 
     GroupBuilder gBuilder = new GroupBuilder().withName("Options")
         .withOption(tempDirOpt)
-        .withOption(helpOpt);
+        .withOption(helpOpt)
+        .withOption(startPhase).withOption(endPhase);
     
     for (Option opt : extraOpts) {
       gBuilder = gBuilder.withOption(opt);
@@ -143,6 +151,14 @@ public abstract class AbstractJob extend
       }
     }
   }
+
+  protected static boolean shouldRunNextPhase(Map<String,String> args, AtomicInteger currentPhase) {
+    int phase = currentPhase.getAndIncrement();
+    String startPhase = args.get("--startPhase");
+    String endPhase = args.get("--endPhase");
+    return !((startPhase != null && phase < Integer.parseInt(startPhase)) ||
+             (endPhase != null && phase > Integer.parseInt(endPhase)));
+  }
   
   protected JobConf prepareJobConf(String inputPath,
                                    String outputPath,
@@ -172,11 +188,7 @@ public abstract class AbstractJob extend
     jobConf.setClass("mapred.reducer.class", reducer, Reducer.class);
     jobConf.setClass("mapred.output.key.class", reducerKey, Writable.class);
     jobConf.setClass("mapred.output.value.class", reducerValue, Writable.class);
-    if (jobConf.get("mapred.output.compress") == null) {
-      jobConf.setBoolean("mapred.output.compress", true);
-      // otherwise leave it to its default value
-    }
-    jobConf.setCompressMapOutput(true);
+    jobConf.setBoolean("mapred.compress.map.output", true);
 
     String customJobName = jobConf.get("mapred.job.name");
     if (customJobName == null) {