You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/04/23 11:34:58 UTC

svn commit: r937210 - in /lucene/mahout/trunk/core/src/main/java/org/apache/mahout: cf/taste/hadoop/ cf/taste/hadoop/item/ cf/taste/hadoop/similarity/item/ common/ math/

Author: srowen
Date: Fri Apr 23 09:34:57 2010
New Revision: 937210

URL: http://svn.apache.org/viewvc?rev=937210&view=rev
Log:
Overhauled implementation entirely, now distributes the matrix multiply more

Added:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java
      - copied, changed from r936233, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java
      - copied, changed from r936184, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
      - copied, changed from r936184, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
Removed:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java
Modified:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithLengthWritable.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java

Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java (from r936233, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java&r1=936233&r2=937210&rev=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityCountWritable.java Fri Apr 23 09:34:57 2010
@@ -23,69 +23,73 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.mahout.common.RandomUtils;
+import org.apache.hadoop.io.WritableUtils;
 
-/** A {@link Writable} encapsulating an item ID and a preference value. */
-public final class EntityPrefWritable extends LongWritable {
-  
-  private float prefValue;
-  
-  public EntityPrefWritable() {
+/** A {@link Writable} encapsulating an item ID and a count . */
+public final class EntityCountWritable extends LongWritable implements Cloneable {
+
+  private int count;
+
+  public EntityCountWritable() {
     // do nothing
   }
-  
-  public EntityPrefWritable(long itemID, float prefValue) {
+
+  public EntityCountWritable(long itemID, int count) {
     super(itemID);
-    this.prefValue = prefValue;
+    this.count = count;
   }
-  
-  public EntityPrefWritable(EntityPrefWritable other) {
-    this(other.get(), other.getPrefValue());
+
+  public EntityCountWritable(EntityCountWritable other) {
+    this(other.get(), other.getCount());
   }
 
   public long getID() {
     return get();
   }
 
-  public float getPrefValue() {
-    return prefValue;
+  public int getCount() {
+    return count;
   }
-  
+
+  public void set(long id, int count) {
+    super.set(id);
+    this.count = count;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    out.writeFloat(prefValue);
+    WritableUtils.writeVInt(out, count);
   }
-  
+
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    prefValue = in.readFloat();
-  }
-  
-  public static EntityPrefWritable read(DataInput in) throws IOException {
-    EntityPrefWritable writable = new EntityPrefWritable();
-    writable.readFields(in);
-    return writable;
+    count = WritableUtils.readVInt(in);
   }
 
   @Override
   public int hashCode() {
-    return super.hashCode() ^ RandomUtils.hashFloat(prefValue);
+    return super.hashCode() ^ count;
   }
 
   @Override
   public boolean equals(Object o) {
-    if (!(o instanceof EntityPrefWritable)) {
+    if (!(o instanceof EntityCountWritable)) {
       return false;
     }
-    EntityPrefWritable other = (EntityPrefWritable) o;
-    return get() == other.get() && prefValue == other.getPrefValue();
+    EntityCountWritable other = (EntityCountWritable) o;
+    return get() == other.get() && count == other.getCount();
+  }
+
+  @Override
+  public String toString() {
+    return get() + "\t" + count;
   }
 
   @Override
-  public EntityPrefWritable clone() {
-    return new EntityPrefWritable(get(), prefValue);
+  public EntityCountWritable clone() {
+    return new EntityCountWritable(get(), count);
   }
-  
+
 }
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityEntityWritable.java Fri Apr 23 09:34:57 2010
@@ -25,7 +25,8 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.mahout.common.RandomUtils;
 
 /** A {@link WritableComparable} encapsulating two items. */
-public final class EntityEntityWritable implements WritableComparable<EntityEntityWritable> {
+public final class EntityEntityWritable
+    implements WritableComparable<EntityEntityWritable>, Cloneable {
   
   private long aID;
   private long bID;
@@ -46,6 +47,11 @@ public final class EntityEntityWritable 
   public long getBID() {
     return bID;
   }
+
+  public void set(long aID, long bID) {
+    this.aID = aID;
+    this.bID = bID;
+  }
   
   @Override
   public void write(DataOutput out) throws IOException {
@@ -59,12 +65,6 @@ public final class EntityEntityWritable 
     bID = in.readLong();
   }
   
-  public static EntityEntityWritable read(DataInput in) throws IOException {
-    EntityEntityWritable writable = new EntityEntityWritable();
-    writable.readFields(in);
-    return writable;
-  }
-  
   @Override
   public int compareTo(EntityEntityWritable that) {
     int aCompare = compare(aID, that.getAID());
@@ -93,5 +93,10 @@ public final class EntityEntityWritable 
   public String toString() {
     return aID + "\t" + bID;
   }
+
+  @Override
+  public EntityEntityWritable clone() {
+    return new EntityEntityWritable(aID, bID);
+  }
   
 }
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/EntityPrefWritable.java Fri Apr 23 09:34:57 2010
@@ -26,7 +26,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.mahout.common.RandomUtils;
 
 /** A {@link Writable} encapsulating an item ID and a preference value. */
-public final class EntityPrefWritable extends LongWritable {
+public final class EntityPrefWritable extends LongWritable implements Cloneable {
   
   private float prefValue;
   
@@ -50,6 +50,11 @@ public final class EntityPrefWritable ex
   public float getPrefValue() {
     return prefValue;
   }
+
+  public void set(long id, float prefValue) {
+    super.set(id);
+    this.prefValue = prefValue;
+  }
   
   @Override
   public void write(DataOutput out) throws IOException {
@@ -62,12 +67,6 @@ public final class EntityPrefWritable ex
     super.readFields(in);
     prefValue = in.readFloat();
   }
-  
-  public static EntityPrefWritable read(DataInput in) throws IOException {
-    EntityPrefWritable writable = new EntityPrefWritable();
-    writable.readFields(in);
-    return writable;
-  }
 
   @Override
   public int hashCode() {
@@ -84,6 +83,11 @@ public final class EntityPrefWritable ex
   }
 
   @Override
+  public String toString() {
+    return get() + "\t" + prefValue;
+  }
+
+  @Override
   public EntityPrefWritable clone() {
     return new EntityPrefWritable(get(), prefValue);
   }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java Fri Apr 23 09:34:57 2010
@@ -47,6 +47,10 @@ public final class RecommendedItemsWrita
   public List<RecommendedItem> getRecommendedItems() {
     return recommended;
   }
+
+  public void set(List<RecommendedItem> recommended) {
+    this.recommended = recommended;
+  }
   
   @Override
   public void write(DataOutput out) throws IOException {
@@ -70,12 +74,6 @@ public final class RecommendedItemsWrita
     }
   }
   
-  public static RecommendedItemsWritable read(DataInput in) throws IOException {
-    RecommendedItemsWritable writable = new RecommendedItemsWritable();
-    writable.readFields(in);
-    return writable;
-  }
-  
   @Override
   public String toString() {
     StringBuilder result = new StringBuilder(200);

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=937210&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Fri Apr 23 09:34:57 2010
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.MapFilesMap;
+import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
+import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
+import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.Vector;
+
+public final class AggregateAndRecommendReducer extends MapReduceBase implements
+    Reducer<LongWritable,RandomAccessSparseVectorWritable,LongWritable,RecommendedItemsWritable> {
+
+  static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
+  static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
+
+  private int recommendationsPerUser;
+  private MapFilesMap<IntWritable,LongWritable> indexItemIDMap;
+
+  @Override
+  public void configure(JobConf jobConf) {
+    try {
+      FileSystem fs = FileSystem.get(jobConf);
+      Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
+      recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
+      indexItemIDMap = new MapFilesMap<IntWritable,LongWritable>(fs, itemIDIndexPath, new Configuration());
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+  }
+
+  @Override
+  public void reduce(LongWritable key,
+                     Iterator<RandomAccessSparseVectorWritable> values,
+                     OutputCollector<LongWritable, RecommendedItemsWritable> output,
+                     Reporter reporter) throws IOException {
+    if (!values.hasNext()) {
+      return;
+    }
+    Vector recommendationVector = values.next().get();
+    while (values.hasNext()) {
+      recommendationVector = recommendationVector.plus(values.next().get());
+    }
+
+    Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1,
+    Collections.reverseOrder(ByValueRecommendedItemComparator.getInstance()));
+
+    Iterator<Vector.Element> recommendationVectorIterator = recommendationVector.iterateNonZero();
+    LongWritable itemID = new LongWritable();
+    while (recommendationVectorIterator.hasNext()) {
+      Vector.Element element = recommendationVectorIterator.next();
+      int index = element.index();
+      if (topItems.size() < recommendationsPerUser) {
+        LongWritable theItemID = indexItemIDMap.get(new IntWritable(index), itemID);
+        if (theItemID != null) {
+          topItems.add(new GenericRecommendedItem(theItemID.get(), (float) element.get()));
+        } // else, huh?
+      } else if (element.get() > topItems.peek().getValue()) {
+        LongWritable theItemID = indexItemIDMap.get(new IntWritable(index), itemID);
+        if (theItemID != null) {
+          topItems.add(new GenericRecommendedItem(theItemID.get(), (float) element.get()));
+          topItems.poll();
+        } // else, huh?
+      }
+    }
+
+    if (!topItems.isEmpty()) {
+      List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>(topItems.size());
+      recommendations.addAll(topItems);
+      Collections.sort(recommendations, ByValueRecommendedItemComparator.getInstance());
+      output.collect(key, new RecommendedItemsWritable(recommendations));
+    }
+  }
+
+}
\ No newline at end of file

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java?rev=937210&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java Fri Apr 23 09:34:57 2010
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.Vector;
+
+public final class AggregateCombiner extends MapReduceBase implements
+    Reducer<LongWritable,RandomAccessSparseVectorWritable,LongWritable,RandomAccessSparseVectorWritable> {
+
+  @Override
+  public void reduce(LongWritable key,
+                     Iterator<RandomAccessSparseVectorWritable> values,
+                     OutputCollector<LongWritable, RandomAccessSparseVectorWritable> output,
+                     Reporter reporter) throws IOException {
+    if (!values.hasNext()) {
+      return;
+    }
+    Vector partial = values.next().get();
+    while (values.hasNext()) {
+      partial = partial.plus(values.next().get());
+    }
+    output.collect(key, new RandomAccessSparseVectorWritable((RandomAccessSparseVector) partial));
+  }
+
+}
\ No newline at end of file

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java?rev=937210&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java Fri Apr 23 09:34:57 2010
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+
+public final class CooccurrenceColumnWrapperMapper extends MapReduceBase implements
+    Mapper<IntWritable, RandomAccessSparseVectorWritable,IntWritable,VectorOrPrefWritable> {
+
+  @Override
+  public void map(IntWritable key,
+                  RandomAccessSparseVectorWritable value,
+                  OutputCollector<IntWritable,VectorOrPrefWritable> output,
+                  Reporter reporter) throws IOException {
+    output.collect(key, new VectorOrPrefWritable((RandomAccessSparseVector) value.get()));
+  }
+
+}
\ No newline at end of file

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java?rev=937210&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceCombiner.java Fri Apr 23 09:34:57 2010
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
+import org.apache.mahout.math.function.IntIntProcedure;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
+
+public final class CooccurrenceCombiner extends MapReduceBase implements
+    Reducer<IntWritable,EntityCountWritable,IntWritable,EntityCountWritable> {
+
+  @Override
+  public void reduce(final IntWritable index1,
+                     Iterator<EntityCountWritable> index2s,
+                     final OutputCollector<IntWritable,EntityCountWritable> output,
+                     Reporter reporter) {
+
+    OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
+    while (index2s.hasNext()) {
+      EntityCountWritable writable = index2s.next();
+      int index = (int) writable.getID();
+      int oldValue = indexCounts.get(index);
+      indexCounts.put(index, oldValue + writable.getCount());
+    }
+
+    final EntityCountWritable entityCount = new EntityCountWritable();
+    indexCounts.forEachPair(new IntIntProcedure() {
+      @Override
+      public boolean apply(int index, int count) {
+        entityCount.set(index, count);
+        try {
+          output.collect(index1, entityCount);
+        } catch (IOException ioe) {
+          throw new IllegalStateException(ioe);
+        }
+        return true;
+      }
+    });
+  }
+}
\ No newline at end of file

Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java (from r936184, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java&r1=936184&r2=937210&rev=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyReducer.java Fri Apr 23 09:34:57 2010
@@ -19,107 +19,72 @@ package org.apache.mahout.cf.taste.hadoo
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.PriorityQueue;
-import java.util.Queue;
 
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
-import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
 import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.LongFloatProcedure;
+import org.apache.mahout.math.map.OpenLongFloatHashMap;
 
-/**
- * <h1>Input</h1>
- * 
- * <p>
- * Takes user IDs as {@link LongWritable} mapped to all associated item IDs and preference values, as
- * {@link EntityPrefWritable}s.
- * </p>
- * 
- * <h1>Output</h1>
- * 
- * <p>
- * The same user ID mapped to a {@link RandomAccessSparseVector} representation of the same item IDs and
- * preference values. Item IDs are used as vector indexes; they are hashed into ints to work as indexes with
- * {@link ItemIDIndexMapper#idToIndex(long)}. The mapping is remembered for later with a combination of
- * {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.
- * </p>
- * 
- * <p>
- * The number of non-default elements actually retained in the user vector is capped at
- * {@link #MAX_PREFS_CONSIDERED}.
- * </p>
- * 
- */
-public final class ToUserVectorReducer extends MapReduceBase implements
-    Reducer<LongWritable,LongWritable,LongWritable,VectorWritable> {
-  
-  public static final int MAX_PREFS_CONSIDERED = 20;
-  
-  private boolean booleanData;
+public final class PartialMultiplyReducer extends MapReduceBase implements
+    Reducer<IntWritable,VectorOrPrefWritable,LongWritable, RandomAccessSparseVectorWritable> {
 
   @Override
-  public void configure(JobConf jobConf) {
-    booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
-  }
-  
-  @Override
-  public void reduce(LongWritable userID,
-                     Iterator<LongWritable> itemPrefs,
-                     OutputCollector<LongWritable,VectorWritable> output,
+  public void reduce(IntWritable key,
+                     Iterator<VectorOrPrefWritable> values,
+                     final OutputCollector<LongWritable,RandomAccessSparseVectorWritable> output,
                      Reporter reporter) throws IOException {
-    if (itemPrefs.hasNext()) {
-      RandomAccessSparseVector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
-      while (itemPrefs.hasNext()) {
-        LongWritable itemPref = itemPrefs.next();
-        int index = ItemIDIndexMapper.idToIndex(itemPref.get());
-        float value;
-        if (itemPref instanceof EntityPrefWritable) {
-          value = ((EntityPrefWritable) itemPref).getPrefValue();
+    OpenLongFloatHashMap savedValues = new OpenLongFloatHashMap();
+    Vector cooccurrenceColumn = null;
+    final int itemIndex = key.get();
+    final LongWritable userIDWritable = new LongWritable();
+    final RandomAccessSparseVectorWritable vectorWritable = new RandomAccessSparseVectorWritable();
+    while (values.hasNext()) {
+      VectorOrPrefWritable value = values.next();
+      if (value.getVector() == null) {
+        // Then this is a user-pref value
+        long userID = value.getUserID();
+        float preferenceValue = value.getValue();
+        if (cooccurrenceColumn == null) {
+          // Haven't seen the co-occurrencce column yet; save it
+          savedValues.put(userID, preferenceValue);
         } else {
-          value = 1.0f;
+          // Have seen it
+          Vector partialProduct = cooccurrenceColumn.times(preferenceValue);
+          // This makes sure this item isn't recommended for this user:
+          partialProduct.set(itemIndex, Double.NEGATIVE_INFINITY);
+          userIDWritable.set(userID);
+          vectorWritable.set(partialProduct);
+          output.collect(userIDWritable, vectorWritable);
         }
-        userVector.set(index, value);
-      }
-      
-      if (!booleanData && userVector.getNumNondefaultElements() > MAX_PREFS_CONSIDERED) {
-        double cutoff = findTopNPrefsCutoff(MAX_PREFS_CONSIDERED,
-          userVector);
-        RandomAccessSparseVector filteredVector = new RandomAccessSparseVector(Integer.MAX_VALUE,
-            MAX_PREFS_CONSIDERED);
-        Iterator<Vector.Element> it = userVector.iterateNonZero();
-        while (it.hasNext()) {
-          Vector.Element element = it.next();
-          if (element.get() >= cutoff) {
-            filteredVector.set(element.index(), element.get());
+      } else {
+        cooccurrenceColumn = value.getVector();
+        final Vector theColumn = cooccurrenceColumn;
+        savedValues.forEachPair(new LongFloatProcedure() {
+          @Override
+          public boolean apply(long userID, float value) {
+            Vector partialProduct = theColumn.times(value);
+            // This makes sure this item isn't recommended for this user:
+            partialProduct.set(itemIndex, Double.NEGATIVE_INFINITY);
+            userIDWritable.set(userID);
+            vectorWritable.set(partialProduct);
+            try {
+              output.collect(userIDWritable, vectorWritable);
+            } catch (IOException ioe) {
+              throw new IllegalStateException(ioe);
+            }
+            return true;
           }
-        }
-        userVector = filteredVector;
-      }
-
-      VectorWritable writable = new VectorWritable(userVector);
-      output.collect(userID, writable);
-    }
-  }
-  
-  private static double findTopNPrefsCutoff(int n, Vector userVector) {
-    Queue<Double> topPrefValues = new PriorityQueue<Double>(n + 1);
-    Iterator<Vector.Element> it = userVector.iterateNonZero();
-    while (it.hasNext()) {
-      double prefValue = it.next().get();
-      if (topPrefValues.size() < n) {
-        topPrefValues.add(prefValue);
-      } else if (prefValue > topPrefValues.peek()) {
-        topPrefValues.add(prefValue);
-        topPrefValues.poll();
+        });
+        savedValues.clear();
       }
     }
-    return topPrefValues.peek();
+
   }
-  
-}
+
+}
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Fri Apr 23 09:34:57 2010
@@ -22,6 +22,8 @@ import java.util.Map;
 
 import org.apache.commons.cli2.Option;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -29,17 +31,20 @@ import org.apache.hadoop.io.compress.Gzi
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.MultipleInputs;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
-import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
-import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
 
 /**
  * <p>Runs a completely distributed recommender job as a series of mapreduces.</p>
@@ -91,38 +96,62 @@ public final class RecommenderJob extend
     String userVectorPath = tempDirPath + "/userVectors";
     String itemIDIndexPath = tempDirPath + "/itemIDIndex";
     String cooccurrencePath = tempDirPath + "/cooccurrence";
-    
-    JobConf itemIDIndexConf = prepareJobConf(inputPath, itemIDIndexPath,
-      TextInputFormat.class, ItemIDIndexMapper.class, IntWritable.class, LongWritable.class,
-      ItemIDIndexReducer.class, IntWritable.class, LongWritable.class, MapFileOutputFormat.class);
+    String parialMultiplyPath = tempDirPath + "/partialMultiply";
+
+    JobConf itemIDIndexConf = prepareJobConf(
+      inputPath, itemIDIndexPath, TextInputFormat.class,
+      ItemIDIndexMapper.class, IntWritable.class, LongWritable.class,
+      ItemIDIndexReducer.class, IntWritable.class, LongWritable.class,
+      MapFileOutputFormat.class);
     JobClient.runJob(itemIDIndexConf);
     
-    JobConf toUserVectorConf = prepareJobConf(inputPath, userVectorPath,
-      TextInputFormat.class, ToItemPrefsMapper.class, LongWritable.class,
-      booleanData ? LongWritable.class : EntityPrefWritable.class,
-      ToUserVectorReducer.class, LongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class);
+    JobConf toUserVectorConf = prepareJobConf(
+      inputPath, userVectorPath, TextInputFormat.class,
+      ToItemPrefsMapper.class, LongWritable.class, booleanData ? LongWritable.class : EntityPrefWritable.class,
+      ToUserVectorReducer.class, LongWritable.class, RandomAccessSparseVectorWritable.class,
+      SequenceFileOutputFormat.class);
     toUserVectorConf.setBoolean(BOOLEAN_DATA, booleanData);
     JobClient.runJob(toUserVectorConf);
-    
-    JobConf toCooccurrenceConf = prepareJobConf(userVectorPath, cooccurrencePath,
-      SequenceFileInputFormat.class, UserVectorToCooccurrenceMapper.class, IntWritable.class,
-      IntWritable.class, UserVectorToCooccurrenceReducer.class, IntWritable.class, VectorWritable.class,
-      MapFileOutputFormat.class);
+
+    JobConf toCooccurrenceConf = prepareJobConf(
+      userVectorPath, cooccurrencePath, SequenceFileInputFormat.class,
+      UserVectorToCooccurrenceMapper.class, IntWritable.class, EntityCountWritable.class,
+      UserVectorToCooccurrenceReducer.class, IntWritable.class, RandomAccessSparseVectorWritable.class,
+      SequenceFileOutputFormat.class);
+    toCooccurrenceConf.setInt("io.sort.mb", 600);
+    toCooccurrenceConf.setClass("mapred.combiner.class", CooccurrenceCombiner.class, Reducer.class);
     JobClient.runJob(toCooccurrenceConf);
-    
-    JobConf recommenderConf = prepareJobConf(userVectorPath, outputPath,
-      SequenceFileInputFormat.class, RecommenderMapper.class, LongWritable.class,
-      RecommendedItemsWritable.class, IdentityReducer.class, LongWritable.class,
-      RecommendedItemsWritable.class, TextOutputFormat.class);
-    recommenderConf.setBoolean(BOOLEAN_DATA, booleanData);    
-    recommenderConf.set(RecommenderMapper.COOCCURRENCE_PATH, cooccurrencePath);
-    recommenderConf.set(RecommenderMapper.ITEMID_INDEX_PATH, itemIDIndexPath);
-    recommenderConf.setInt(RecommenderMapper.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
+
+    JobConf partialMultiplyConf = prepareJobConf(
+      cooccurrencePath, parialMultiplyPath, SequenceFileInputFormat.class,
+      CooccurrenceColumnWrapperMapper.class, IntWritable.class, VectorOrPrefWritable.class,
+      PartialMultiplyReducer.class, LongWritable.class, RandomAccessSparseVectorWritable.class,
+      SequenceFileOutputFormat.class);
+    MultipleInputs.addInputPath(
+        partialMultiplyConf,
+        new Path(cooccurrencePath).makeQualified(FileSystem.get(partialMultiplyConf)),
+        SequenceFileInputFormat.class, CooccurrenceColumnWrapperMapper.class);
+    MultipleInputs.addInputPath(
+        partialMultiplyConf,
+        new Path(userVectorPath).makeQualified(FileSystem.get(partialMultiplyConf)),
+        SequenceFileInputFormat.class, UserVectorSplitterMapper.class);
     if (usersFile != null) {
-      recommenderConf.set(RecommenderMapper.USERS_FILE, usersFile);
+      partialMultiplyConf.set(UserVectorSplitterMapper.USERS_FILE, usersFile);
     }
-    recommenderConf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
-    JobClient.runJob(recommenderConf);
+    JobClient.runJob(partialMultiplyConf);
+
+    JobConf aggregateAndRecommendConf = prepareJobConf(
+        parialMultiplyPath, outputPath, SequenceFileInputFormat.class,
+        IdentityMapper.class, LongWritable.class, RandomAccessSparseVectorWritable.class,
+        AggregateAndRecommendReducer.class, LongWritable.class, RecommendedItemsWritable.class,
+        TextOutputFormat.class);
+    aggregateAndRecommendConf.setInt("io.sort.mb", 600);
+    aggregateAndRecommendConf.setClass("mapred.combiner.class", AggregateCombiner.class, Reducer.class);
+    aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath);
+    aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
+    aggregateAndRecommendConf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
+    JobClient.runJob(aggregateAndRecommendConf);
+
     return 0;
   }
   

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java Fri Apr 23 09:34:57 2010
@@ -30,8 +30,8 @@ import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
 import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
 import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
 
 /**
  * <h1>Input</h1>
@@ -57,7 +57,7 @@ import org.apache.mahout.math.VectorWrit
  * 
  */
 public final class ToUserVectorReducer extends MapReduceBase implements
-    Reducer<LongWritable,LongWritable,LongWritable,VectorWritable> {
+    Reducer<LongWritable,LongWritable,LongWritable, RandomAccessSparseVectorWritable> {
   
   public static final int MAX_PREFS_CONSIDERED = 20;
   
@@ -71,42 +71,43 @@ public final class ToUserVectorReducer e
   @Override
   public void reduce(LongWritable userID,
                      Iterator<LongWritable> itemPrefs,
-                     OutputCollector<LongWritable,VectorWritable> output,
+                     OutputCollector<LongWritable,RandomAccessSparseVectorWritable> output,
                      Reporter reporter) throws IOException {
-    if (itemPrefs.hasNext()) {
-      RandomAccessSparseVector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
-      while (itemPrefs.hasNext()) {
-        LongWritable itemPref = itemPrefs.next();
-        int index = ItemIDIndexMapper.idToIndex(itemPref.get());
-        float value;
-        if (itemPref instanceof EntityPrefWritable) {
-          value = ((EntityPrefWritable) itemPref).getPrefValue();
-        } else {
-          value = 1.0f;
-        }
-        userVector.set(index, value);
+    if (!itemPrefs.hasNext()) {
+      return;
+    }
+    RandomAccessSparseVector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    while (itemPrefs.hasNext()) {
+      LongWritable itemPref = itemPrefs.next();
+      int index = ItemIDIndexMapper.idToIndex(itemPref.get());
+      float value;
+      if (itemPref instanceof EntityPrefWritable) {
+        value = ((EntityPrefWritable) itemPref).getPrefValue();
+      } else {
+        value = 1.0f;
       }
-      
-      if (!booleanData && userVector.getNumNondefaultElements() > MAX_PREFS_CONSIDERED) {
-        double cutoff = findTopNPrefsCutoff(MAX_PREFS_CONSIDERED,
-          userVector);
-        RandomAccessSparseVector filteredVector = new RandomAccessSparseVector(Integer.MAX_VALUE,
-            MAX_PREFS_CONSIDERED);
-        Iterator<Vector.Element> it = userVector.iterateNonZero();
-        while (it.hasNext()) {
-          Vector.Element element = it.next();
-          if (element.get() >= cutoff) {
-            filteredVector.set(element.index(), element.get());
-          }
+      userVector.set(index, value);
+    }
+
+    if (!booleanData && userVector.getNumNondefaultElements() > MAX_PREFS_CONSIDERED) {
+      double cutoff = findTopNPrefsCutoff(MAX_PREFS_CONSIDERED,
+        userVector);
+      RandomAccessSparseVector filteredVector = new RandomAccessSparseVector(Integer.MAX_VALUE,
+          MAX_PREFS_CONSIDERED);
+      Iterator<Vector.Element> it = userVector.iterateNonZero();
+      while (it.hasNext()) {
+        Vector.Element element = it.next();
+        if (element.get() >= cutoff) {
+          filteredVector.set(element.index(), element.get());
         }
-        userVector = filteredVector;
       }
-
-      VectorWritable writable = new VectorWritable(userVector);
-      output.collect(userID, writable);
+      userVector = filteredVector;
     }
+
+    RandomAccessSparseVectorWritable writable = new RandomAccessSparseVectorWritable(userVector);
+    output.collect(userID, writable);
   }
-  
+
   private static double findTopNPrefsCutoff(int n, Vector userVector) {
     Queue<Double> topPrefValues = new PriorityQueue<Double>(n + 1);
     Iterator<Vector.Element> it = userVector.iterateNonZero();

Copied: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java (from r936184, lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java)
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java?p2=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java&p1=lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java&r1=936184&r2=937210&rev=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java Fri Apr 23 09:34:57 2010
@@ -18,34 +18,71 @@
 package org.apache.mahout.cf.taste.hadoop.item;
 
 import java.io.IOException;
-import java.util.regex.Pattern;
+import java.util.Iterator;
 
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.Vector;
 
-public final class ItemIDIndexMapper extends MapReduceBase implements
-    Mapper<LongWritable,Text,IntWritable,LongWritable> {
-  
-  private static final Pattern COMMA = Pattern.compile(",");
+public final class UserVectorSplitterMapper extends MapReduceBase implements
+    Mapper<LongWritable,RandomAccessSparseVectorWritable,IntWritable,VectorOrPrefWritable> {
+
+  static final String USERS_FILE = "usersFile";
   
+  private FastIDSet usersToRecommendFor;
+
+  @Override
+  public void configure(JobConf jobConf) {
+    try {
+      FileSystem fs = FileSystem.get(jobConf);
+      String usersFilePathString = jobConf.get(USERS_FILE);
+      if (usersFilePathString == null) {
+        usersToRecommendFor = null;
+      } else {
+        usersToRecommendFor = new FastIDSet();
+        Path usersFilePath = new Path(usersFilePathString).makeQualified(fs);
+        FSDataInputStream in = fs.open(usersFilePath);
+        for (String line : new FileLineIterable(in)) {
+          usersToRecommendFor.add(Long.parseLong(line));
+        }
+      }
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+  }
+
   @Override
   public void map(LongWritable key,
-                  Text value,
-                  OutputCollector<IntWritable,LongWritable> output,
+                  RandomAccessSparseVectorWritable value,
+                  OutputCollector<IntWritable,VectorOrPrefWritable> output,
                   Reporter reporter) throws IOException {
-    String[] tokens = ItemIDIndexMapper.COMMA.split(value.toString());
-    long itemID = Long.parseLong(tokens[1]);
-    int index = idToIndex(itemID);
-    output.collect(new IntWritable(index), new LongWritable(itemID));
-  }
-  
-  static int idToIndex(long itemID) {
-    return 0x7FFFFFFF & ((int) itemID ^ (int) (itemID >>> 32));
+    long userID = key.get();
+    if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
+      return;
+    }
+    Vector userVector = value.get();
+    Iterator<Vector.Element> it = userVector.iterateNonZero();
+    IntWritable itemIndexWritable = new IntWritable();
+    VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
+    while (it.hasNext()) {
+      Vector.Element e = it.next();
+      int itemIndex = e.index();
+      double preferenceValue = e.get();
+      itemIndexWritable.set(itemIndex);
+      vectorOrPref.set(userID, (float) preferenceValue);
+      output.collect(itemIndexWritable, vectorOrPref);
+    }
   }
-  
+
 }
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java Fri Apr 23 09:34:57 2010
@@ -26,29 +26,29 @@ import org.apache.hadoop.mapred.MapReduc
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
 import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
 
 public final class UserVectorToCooccurrenceMapper extends MapReduceBase implements
-    Mapper<LongWritable,VectorWritable,IntWritable,IntWritable> {
+    Mapper<LongWritable, RandomAccessSparseVectorWritable,IntWritable, EntityCountWritable> {
   
   @Override
   public void map(LongWritable userID,
-                  VectorWritable userVector,
-                  OutputCollector<IntWritable,IntWritable> output,
+                  RandomAccessSparseVectorWritable userVector,
+                  OutputCollector<IntWritable,EntityCountWritable> output,
                   Reporter reporter) throws IOException {
     Iterator<Vector.Element> it = userVector.get().iterateNonZero();
+    EntityCountWritable entityCount = new EntityCountWritable();
+    IntWritable writable1 = new IntWritable();
     while (it.hasNext()) {
-      Vector.Element next1 = it.next();
-      int index1 = next1.index();
+      int index1 = it.next().index();
+      writable1.set(index1);
       Iterator<Vector.Element> it2 = userVector.get().iterateNonZero();
-      IntWritable itemWritable1 = new IntWritable(index1);
       while (it2.hasNext()) {
-        Vector.Element next2 = it2.next();
-        int index2 = next2.index();
-        if (index1 != index2) {
-          output.collect(itemWritable1, new IntWritable(index2));
-        }
+        int index2 = it2.next().index();
+        entityCount.set(index2, 1);
+        output.collect(writable1, entityCount);
       }
     }
   }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java Fri Apr 23 09:34:57 2010
@@ -25,34 +25,39 @@ import org.apache.hadoop.mapred.MapReduc
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.EntityCountWritable;
 import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.function.IntIntProcedure;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
 
 public final class UserVectorToCooccurrenceReducer extends MapReduceBase implements
-    Reducer<IntWritable,IntWritable,IntWritable,VectorWritable> {
+    Reducer<IntWritable, EntityCountWritable,IntWritable, RandomAccessSparseVectorWritable> {
 
   @Override
   public void reduce(IntWritable index1,
-                     Iterator<IntWritable> index2s,
-                     OutputCollector<IntWritable,VectorWritable> output,
+                     Iterator<EntityCountWritable> index2s,
+                     OutputCollector<IntWritable,RandomAccessSparseVectorWritable> output,
                      Reporter reporter) throws IOException {
-    if (index2s.hasNext()) {
-      RandomAccessSparseVector cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE, 1000);
-      while (index2s.hasNext()) {
-        int index2 = index2s.next().get();
-        cooccurrenceRow.set(index2, cooccurrenceRow.get(index2) + 1.0);
-      }
-      Iterator<Vector.Element> cooccurrences = cooccurrenceRow.iterateNonZero();
-      while (cooccurrences.hasNext()) {
-        Vector.Element element = cooccurrences.next();
-        if (element.get() <= 1.0) { // purge small values
-          element.set(0.0);
-        }
-      }
-      VectorWritable writable = new VectorWritable(cooccurrenceRow);
-      output.collect(index1, writable);
+
+    OpenIntIntHashMap indexCounts = new OpenIntIntHashMap();
+    while (index2s.hasNext()) {
+      EntityCountWritable writable = index2s.next();
+      int index = (int) writable.getID();
+      int oldValue = indexCounts.get(index);
+      indexCounts.put(index, oldValue + writable.getCount());
     }
+
+    final RandomAccessSparseVector cooccurrenceRow =
+        new RandomAccessSparseVector(Integer.MAX_VALUE, 1000);
+    indexCounts.forEachPair(new IntIntProcedure() {
+      @Override
+      public boolean apply(int index, int count) {
+        cooccurrenceRow.set(index, count);
+        return true;
+      }
+    });
+    output.collect(index1, new RandomAccessSparseVectorWritable(cooccurrenceRow));
   }
   
 }
\ No newline at end of file

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java?rev=937210&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java Fri Apr 23 09:34:57 2010
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.RandomAccessSparseVectorWritable;
+import org.apache.mahout.math.Vector;
+
+public final class VectorOrPrefWritable implements Writable {
+
+  private RandomAccessSparseVector vector;
+  private long userID;
+  private float value;
+
+  public VectorOrPrefWritable() {
+  }
+
+  public VectorOrPrefWritable(RandomAccessSparseVector vector) {
+    this.vector = vector;
+  }
+
+  public VectorOrPrefWritable(long userID, float value) {
+    this.userID = userID;
+    this.value = value;
+  }
+
+  public Vector getVector() {
+    return vector;
+  }
+
+  public long getUserID() {
+    return userID;
+  }
+
+  public float getValue() {
+    return value;
+  }
+
+  public void set(RandomAccessSparseVector vector) {
+    this.vector = vector;
+    this.userID = Long.MIN_VALUE;
+    this.value = Float.NaN;
+  }
+
+  public void set(long userID, float value) {
+    this.vector = null;
+    this.userID = userID;
+    this.value = value;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (vector == null) {
+      out.writeBoolean(false);
+      out.writeLong(userID);
+      out.writeFloat(value);
+    } else {
+      out.writeBoolean(true);
+      new RandomAccessSparseVectorWritable(vector).write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    boolean hasVector = in.readBoolean();
+    if (hasVector) {
+      RandomAccessSparseVectorWritable writable = new RandomAccessSparseVectorWritable();
+      writable.readFields(in);
+      set((RandomAccessSparseVector) writable.get());
+    } else {
+      long theUserID = in.readLong();
+      float theValue = in.readFloat();
+      set(theUserID, theValue);
+    }
+  }
+
+}

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPairWritable.java Fri Apr 23 09:34:57 2010
@@ -59,7 +59,8 @@ public final class ItemPairWritable impl
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    itemItemWritable = EntityEntityWritable.read(in);
+    itemItemWritable = new EntityEntityWritable();
+    itemItemWritable.readFields(in);
     multipliedLength = in.readDouble();
   }
 

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithLengthWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithLengthWritable.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithLengthWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemPrefWithLengthWritable.java Fri Apr 23 09:34:57 2010
@@ -64,7 +64,8 @@ public final class ItemPrefWithLengthWri
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    itemPref = EntityPrefWritable.read(in);
+    itemPref = new EntityPrefWritable();
+    itemPref.readFields(in);
     length = in.readDouble();
   }
 

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Fri Apr 23 09:34:57 2010
@@ -176,10 +176,13 @@ public abstract class AbstractJob extend
       jobConf.setBoolean("mapred.output.compress", true);
       // otherwise leave it to its default value
     }
+    jobConf.setCompressMapOutput(true);
+
     String customJobName = jobConf.get("mapred.job.name");
-    if (customJobName != null) {
-      jobConf.set("mapred.job.name", customJobName + '-' + mapper.getSimpleName() + '-' + reducer.getSimpleName());
+    if (customJobName == null) {
+      customJobName = getClass().getSimpleName();
     }
+    jobConf.set("mapred.job.name", customJobName + '-' + mapper.getSimpleName() + '-' + reducer.getSimpleName());
 
     jobConf.setClass("mapred.output.format.class", outputFormat, OutputFormat.class);
     // Override this:    

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java?rev=937210&r1=937209&r2=937210&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/VectorWritable.java Fri Apr 23 09:34:57 2010
@@ -40,7 +40,7 @@ public class VectorWritable extends Conf
     return vector;
   }
 
-  protected void set(Vector vector) {
+  public void set(Vector vector) {
     this.vector = vector;
   }