You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2013/03/25 09:16:23 UTC

svn commit: r1460541 - in /mahout/trunk/core/src: main/java/org/apache/mahout/cf/taste/common/ main/java/org/apache/mahout/cf/taste/hadoop/ main/java/org/apache/mahout/cf/taste/hadoop/als/ main/java/org/apache/mahout/cf/taste/hadoop/item/ main/java/org...

Author: ssc
Date: Mon Mar 25 08:16:22 2013
New Revision: 1460541

URL: http://svn.apache.org/r1460541
Log:
MAHOUT-1172 Replace org.apache.mahout.cf.taste.common.TopK with Lucene's PriorityQueue

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java
      - copied, changed from r1460336, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MutableRecommendedItem.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueue.java
      - copied, changed from r1460336, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/TopItemQueue.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/TopSimilarItemsQueue.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/MutableElement.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/TopElementsQueue.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueueTest.java
      - copied, changed from r1460336, mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/TopItemQueueTest.java
Removed:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/common/FixedSizePriorityQueue.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/common/MinK.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/common/TopK.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MutableRecommendedItem.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/TopItemQueue.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/common/TopKMinKTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/TopItemQueueTest.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/similarity/precompute/SimilarItem.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java
    mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java

Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java (from r1460336, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MutableRecommendedItem.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MutableRecommendedItem.java&r1=1460336&r2=1460541&rev=1460541&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MutableRecommendedItem.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MutableRecommendedItem.java Mon Mar 25 08:16:22 2013
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.mahout.cf.taste.hadoop.als;
+package org.apache.mahout.cf.taste.hadoop;
 
 import org.apache.mahout.cf.taste.recommender.RecommendedItem;
 import org.apache.mahout.common.RandomUtils;
@@ -23,11 +23,18 @@ import org.apache.mahout.common.RandomUt
 /**
  * Mutable variant of {@link RecommendedItem}
  */
-class MutableRecommendedItem implements RecommendedItem {
+public class MutableRecommendedItem implements RecommendedItem {
 
   private long itemID;
   private float value;
 
+  public MutableRecommendedItem() {}
+
+  public MutableRecommendedItem(long itemID, float value) {
+    this.itemID = itemID;
+    this.value = value;
+  }
+
   @Override
   public long getItemID() {
     return itemID;

Copied: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueue.java (from r1460336, mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/TopItemQueue.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueue.java?p2=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueue.java&p1=mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/TopItemQueue.java&r1=1460336&r2=1460541&rev=1460541&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/TopItemQueue.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueue.java Mon Mar 25 08:16:22 2013
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.mahout.cf.taste.hadoop.als;
+package org.apache.mahout.cf.taste.hadoop;
 
 import com.google.common.collect.Lists;
 import org.apache.lucene.util.PriorityQueue;
@@ -24,13 +24,13 @@ import org.apache.mahout.cf.taste.recomm
 import java.util.Collections;
 import java.util.List;
 
-public class TopItemQueue extends PriorityQueue<MutableRecommendedItem> {
+public class TopItemsQueue extends PriorityQueue<MutableRecommendedItem> {
 
   private static final long SENTINEL_ID = Long.MIN_VALUE;
 
   private final int maxSize;
 
-  public TopItemQueue(int maxSize) {
+  public TopItemsQueue(int maxSize) {
     super(maxSize);
     this.maxSize = maxSize;
   }
@@ -55,8 +55,6 @@ public class TopItemQueue extends Priori
 
   @Override
   protected MutableRecommendedItem getSentinelObject() {
-    MutableRecommendedItem sentinel =  new MutableRecommendedItem();
-    sentinel.set(SENTINEL_ID, Float.MIN_VALUE);
-    return sentinel;
+    return new MutableRecommendedItem(SENTINEL_ID, Float.MIN_VALUE);
   }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java?rev=1460541&r1=1460540&r2=1460541&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/PredictionMapper.java Mon Mar 25 08:16:22 2013
@@ -19,7 +19,9 @@ package org.apache.mahout.cf.taste.hadoo
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.mahout.cf.taste.hadoop.MutableRecommendedItem;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.hadoop.TopItemsQueue;
 import org.apache.mahout.cf.taste.recommender.RecommendedItem;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.math.Vector;
@@ -79,7 +81,7 @@ public class PredictionMapper extends Sh
       alreadyRatedItems.add(ratingsIterator.next().index());
     }
 
-    final TopItemQueue topItemQueue = new TopItemQueue(recommendationsPerUser);
+    final TopItemsQueue topItemsQueue = new TopItemsQueue(recommendationsPerUser);
     final Vector userFeatures = U.get(userID);
 
     M.forEachPair(new IntObjectProcedure<Vector>() {
@@ -88,17 +90,17 @@ public class PredictionMapper extends Sh
         if (!alreadyRatedItems.contains(itemID)) {
           double predictedRating = userFeatures.dot(itemFeatures);
 
-          MutableRecommendedItem top = topItemQueue.top();
+          MutableRecommendedItem top = topItemsQueue.top();
           if (predictedRating > top.getValue()) {
             top.set(itemID, (float) predictedRating);
-            topItemQueue.updateTop();
+            topItemsQueue.updateTop();
           }
         }
         return true;
       }
     });
 
-    List<RecommendedItem> recommendedItems = topItemQueue.getTopItems();
+    List<RecommendedItem> recommendedItems = topItemsQueue.getTopItems();
 
     if (!recommendedItems.isEmpty()) {
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=1460541&r1=1460540&r2=1460541&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Mon Mar 25 08:16:22 2013
@@ -21,11 +21,11 @@ import com.google.common.primitives.Floa
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.mahout.cf.taste.common.TopK;
+import org.apache.mahout.cf.taste.hadoop.MutableRecommendedItem;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
 import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.TopItemsQueue;
 import org.apache.mahout.cf.taste.impl.common.FastIDSet;
-import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
 import org.apache.mahout.cf.taste.recommender.RecommendedItem;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.iterator.FileLineIterable;
@@ -38,6 +38,7 @@ import org.apache.mahout.math.map.OpenIn
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -188,7 +189,7 @@ public final class AggregateAndRecommend
   private void writeRecommendedItems(VarLongWritable userID, Vector recommendationVector, Context context)
     throws IOException, InterruptedException {
 
-    TopK<RecommendedItem> topKItems = new TopK<RecommendedItem>(recommendationsPerUser, BY_PREFERENCE_VALUE);
+    TopItemsQueue topKItems = new TopItemsQueue(recommendationsPerUser);
 
     Iterator<Vector.Element> recommendationVectorIterator = recommendationVector.iterateNonZero();
     while (recommendationVectorIterator.hasNext()) {
@@ -203,13 +204,19 @@ public final class AggregateAndRecommend
       if (itemsToRecommendFor == null || itemsToRecommendFor.contains(itemID)) {
         float value = (float) element.get();
         if (!Float.isNaN(value)) {
-          topKItems.offer(new GenericRecommendedItem(itemID, value));
+
+          MutableRecommendedItem topItem = topKItems.top();
+          if (value > topItem.getValue()) {
+            topItem.set(itemID, value);
+            topKItems.updateTop();
+          }
         }
       }
     }
 
-    if (!topKItems.isEmpty()) {
-      context.write(userID, new RecommendedItemsWritable(topKItems.retrieve()));
+    List<RecommendedItem> topItems = topKItems.getTopItems();
+    if (!topItems.isEmpty()) {
+      context.write(userID, new RecommendedItemsWritable(topItems));
     }
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java?rev=1460541&r1=1460540&r2=1460541&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java Mon Mar 25 08:16:22 2013
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.mahout.cf.taste.common.TopK;
+import org.apache.lucene.util.PriorityQueue;
 import org.apache.mahout.cf.taste.impl.common.FastIDSet;
 import org.apache.mahout.common.iterator.FileLineIterable;
 import org.apache.mahout.math.VarIntWritable;
@@ -32,7 +32,6 @@ import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
 import java.io.IOException;
-import java.util.Comparator;
 import java.util.Iterator;
 
 import org.slf4j.Logger;
@@ -120,19 +119,19 @@ public final class UserVectorSplitterMap
 
   private float findSmallestLargeValue(Vector userVector) {
 
-    TopK<Float> topPrefValues = new TopK<Float>(maxPrefsPerUserConsidered, new Comparator<Float>() {
+    PriorityQueue<Float> topPrefValues = new PriorityQueue<Float>(maxPrefsPerUserConsidered) {
       @Override
-      public int compare(Float one, Float two) {
-        return one.compareTo(two);
+      protected boolean lessThan(Float f1, Float f2) {
+        return f1 < f2;
       }
-    });
+    };
 
     Iterator<Vector.Element> it = userVector.iterateNonZero();
     while (it.hasNext()) {
       float absValue = Math.abs((float) it.next().get());
-      topPrefValues.offer(absValue);
+      topPrefValues.insertWithOverflow(absValue);
     }
-    return topPrefValues.smallestGreat();
+    return topPrefValues.top();
   }
 
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java?rev=1460541&r1=1460540&r2=1460541&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java Mon Mar 25 08:16:22 2013
@@ -35,7 +35,6 @@ import org.apache.hadoop.mapreduce.Reduc
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.cf.taste.common.TopK;
 import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
 import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
 import org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob;
@@ -197,18 +196,22 @@ public final class ItemSimilarityJob ext
 
       int itemIDIndex = itemIDIndexWritable.get();
 
-      TopK<SimilarItem> topKMostSimilarItems =
-          new TopK<SimilarItem>(maxSimilarItemsPerItem, SimilarItem.COMPARE_BY_SIMILARITY);
+      TopSimilarItemsQueue topKMostSimilarItems = new TopSimilarItemsQueue(maxSimilarItemsPerItem);
 
       Iterator<Vector.Element> similarityVectorIterator = similarityVector.get().iterateNonZero();
 
       while (similarityVectorIterator.hasNext()) {
         Vector.Element element = similarityVectorIterator.next();
-        topKMostSimilarItems.offer(new SimilarItem(indexItemIDMap.get(element.index()), element.get()));
+        SimilarItem top = topKMostSimilarItems.top();
+        double candidateSimilarity = element.get();
+        if (candidateSimilarity > top.getSimilarity()) {
+          top.set(indexItemIDMap.get(element.index()), candidateSimilarity);
+          topKMostSimilarItems.updateTop();
+        }
       }
 
       long itemID = indexItemIDMap.get(itemIDIndex);
-      for (SimilarItem similarItem : topKMostSimilarItems.retrieve()) {
+      for (SimilarItem similarItem : topKMostSimilarItems.getTopItems()) {
         long otherItemID = similarItem.getItemID();
         if (itemID < otherItemID) {
           ctx.write(new EntityEntityWritable(itemID, otherItemID), new DoubleWritable(similarItem.getSimilarity()));

Added: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/TopSimilarItemsQueue.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/TopSimilarItemsQueue.java?rev=1460541&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/TopSimilarItemsQueue.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/TopSimilarItemsQueue.java Mon Mar 25 08:16:22 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.similarity.item;
+
+import com.google.common.collect.Lists;
+import org.apache.lucene.util.PriorityQueue;
+import org.apache.mahout.cf.taste.similarity.precompute.SimilarItem;
+
+import java.util.Collections;
+import java.util.List;
+
+public class TopSimilarItemsQueue extends PriorityQueue<SimilarItem> {
+
+  private static final long SENTINEL_ID = Long.MIN_VALUE;
+
+  private final int maxSize;
+
+    public TopSimilarItemsQueue(int maxSize) {
+      super(maxSize);
+      this.maxSize = maxSize;
+    }
+
+    public List<SimilarItem> getTopItems() {
+      List<SimilarItem> items = Lists.newArrayListWithCapacity(maxSize);
+      while (size() > 0) {
+        SimilarItem topItem = pop();
+        // filter out "sentinel" objects necessary for maintaining an efficient priority queue
+        if (topItem.getItemID() != SENTINEL_ID) {
+          items.add(topItem);
+        }
+      }
+      Collections.reverse(items);
+      return items;
+    }
+
+    @Override
+    protected boolean lessThan(SimilarItem one, SimilarItem two) {
+      return one.getSimilarity() < two.getSimilarity();
+    }
+
+    @Override
+    protected SimilarItem getSentinelObject() {
+      return new SimilarItem(SENTINEL_ID, Double.MIN_VALUE);
+    }
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/similarity/precompute/SimilarItem.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/similarity/precompute/SimilarItem.java?rev=1460541&r1=1460540&r2=1460541&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/similarity/precompute/SimilarItem.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/similarity/precompute/SimilarItem.java Mon Mar 25 08:16:22 2013
@@ -33,10 +33,14 @@ public class SimilarItem {
     }
   };
 
-  private final long itemID;
-  private final double similarity;
+  private long itemID;
+  private double similarity;
 
   public SimilarItem(long itemID, double similarity) {
+    set(itemID, similarity);
+  }
+
+  public void set(long itemID, double similarity) {
     this.itemID = itemID;
     this.similarity = similarity;
   }

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/MutableElement.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/MutableElement.java?rev=1460541&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/MutableElement.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/MutableElement.java Mon Mar 25 08:16:22 2013
@@ -0,0 +1,33 @@
+package org.apache.mahout.math.hadoop.similarity.cooccurrence;
+
+import org.apache.mahout.math.Vector;
+
+public class MutableElement implements Vector.Element {
+
+  private int index;
+  private double value;
+
+  MutableElement(int index, double value) {
+    this.index = index;
+    this.value = value;
+  }
+
+  @Override
+  public double get() {
+    return value;
+  }
+
+  @Override
+  public int index() {
+    return index;
+  }
+
+  public void setIndex(int index) {
+    this.index = index;
+  }
+
+  @Override
+  public void set(double value) {
+    this.value = value;
+  }
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java?rev=1460541&r1=1460540&r2=1460541&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java Mon Mar 25 08:16:22 2013
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.cf.taste.common.TopK;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.ClassUtils;
 import org.apache.mahout.common.HadoopUtil;
@@ -408,17 +407,25 @@ public class RowSimilarityJob extends Ab
       Vector similarities = similaritiesWritable.get();
       // For performance, the creation of transposedPartial is moved out of the while loop and it is reused inside
       Vector transposedPartial = new RandomAccessSparseVector(similarities.size(), 1);
-      TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(maxSimilaritiesPerRow, Vectors.BY_VALUE);
+      TopElementsQueue topKQueue = new TopElementsQueue(maxSimilaritiesPerRow);
       Iterator<Vector.Element> nonZeroElements = similarities.iterateNonZero();
       while (nonZeroElements.hasNext()) {
         Vector.Element nonZeroElement = nonZeroElements.next();
-        topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));
-        transposedPartial.setQuick(row.get(), nonZeroElement.get());
+
+        MutableElement top = topKQueue.top();
+        double candidateValue = nonZeroElement.get();
+        if (candidateValue > top.get()) {
+          top.setIndex(nonZeroElement.index());
+          top.set(candidateValue);
+          topKQueue.updateTop();
+        }
+
+        transposedPartial.setQuick(row.get(), candidateValue);
         ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));
         transposedPartial.setQuick(row.get(), 0.0);
       }
       Vector topKSimilarities = new RandomAccessSparseVector(similarities.size(), maxSimilaritiesPerRow);
-      for (Vector.Element topKSimilarity : topKQueue.retrieve()) {
+      for (Vector.Element topKSimilarity : topKQueue.getTopElements()) {
         topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());
       }
       ctx.write(row, new VectorWritable(topKSimilarities));

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/TopElementsQueue.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/TopElementsQueue.java?rev=1460541&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/TopElementsQueue.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/TopElementsQueue.java Mon Mar 25 08:16:22 2013
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity.cooccurrence;
+
+import com.google.common.collect.Lists;
+import org.apache.lucene.util.PriorityQueue;
+
+import java.util.Collections;
+import java.util.List;
+
+public class TopElementsQueue extends PriorityQueue<MutableElement> {
+
+  private final int maxSize;
+
+  private static final int SENTINEL_INDEX = Integer.MIN_VALUE;
+
+  public TopElementsQueue(int maxSize) {
+    super(maxSize);
+    this.maxSize = maxSize;
+  }
+
+  public List<MutableElement> getTopElements() {
+    List<MutableElement> topElements = Lists.newArrayListWithCapacity(maxSize);
+    while (size() > 0) {
+      MutableElement top = pop();
+      // filter out "sentinel" objects necessary for maintaining an efficient priority queue
+      if (top.index() != SENTINEL_INDEX) {
+        topElements.add(top);
+      }
+    }
+    Collections.reverse(topElements);
+    return topElements;
+  }
+
+  @Override
+  protected MutableElement getSentinelObject() {
+    return new MutableElement(SENTINEL_INDEX, Double.MIN_VALUE);
+  }
+
+  @Override
+  protected boolean lessThan(MutableElement e1, MutableElement e2) {
+    return e1.get() < e2.get();
+  }
+}

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java?rev=1460541&r1=1460540&r2=1460541&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java Mon Mar 25 08:16:22 2013
@@ -19,13 +19,11 @@ package org.apache.mahout.math.hadoop.si
 
 import com.google.common.base.Preconditions;
 import com.google.common.io.Closeables;
-import com.google.common.primitives.Doubles;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.mahout.cf.taste.common.TopK;
 import org.apache.mahout.common.iterator.FixedSizeSamplingIterator;
 import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.Varint;
@@ -35,13 +33,11 @@ import org.apache.mahout.math.map.OpenIn
 
 import java.io.DataInput;
 import java.io.IOException;
-import java.util.Comparator;
 import java.util.Iterator;
 
 public final class Vectors {
 
-  private Vectors() {
-  }
+  private Vectors() {}
 
   public static Vector maybeSample(Vector original, int sampleSize) {
     if (original.getNumNondefaultElements() <= sampleSize) {
@@ -61,14 +57,23 @@ public final class Vectors {
     if (original.getNumNondefaultElements() <= k) {
       return original;
     }
-    TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(k, BY_VALUE);
+
+    TopElementsQueue topKQueue = new TopElementsQueue(k);
     Iterator<Vector.Element> nonZeroElements = original.iterateNonZero();
     while (nonZeroElements.hasNext()) {
       Vector.Element nonZeroElement = nonZeroElements.next();
-      topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));
+
+      MutableElement top = topKQueue.top();
+      double candidateValue = nonZeroElement.get();
+      if (candidateValue > top.get()) {
+        top.setIndex(nonZeroElement.index());
+        top.set(candidateValue);
+        topKQueue.updateTop();
+      }
     }
+
     Vector topKSimilarities = new RandomAccessSparseVector(original.size(), k);
-    for (Vector.Element topKSimilarity : topKQueue.retrieve()) {
+    for (Vector.Element topKSimilarity : topKQueue.getTopElements()) {
       topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());
     }
     return topKSimilarities;
@@ -90,13 +95,6 @@ public final class Vectors {
     return accumulator;
   }
 
-  static final Comparator<Vector.Element> BY_VALUE = new Comparator<Vector.Element>() {
-    @Override
-    public int compare(Vector.Element elem1, Vector.Element elem2) {
-      return Doubles.compare(elem1.get(), elem2.get());
-    }
-  };
-
   static class TemporaryElement implements Vector.Element {
 
     private final int index;

Copied: mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueueTest.java (from r1460336, mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/TopItemQueueTest.java)
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueueTest.java?p2=mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueueTest.java&p1=mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/TopItemQueueTest.java&r1=1460336&r2=1460541&rev=1460541&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/als/TopItemQueueTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/TopItemsQueueTest.java Mon Mar 25 08:16:22 2013
@@ -15,8 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.mahout.cf.taste.hadoop.als;
+package org.apache.mahout.cf.taste.hadoop;
 
+import org.apache.mahout.cf.taste.hadoop.MutableRecommendedItem;
+import org.apache.mahout.cf.taste.hadoop.TopItemsQueue;
 import org.apache.mahout.cf.taste.impl.TasteTestCase;
 import org.apache.mahout.cf.taste.recommender.RecommendedItem;
 import org.apache.mahout.common.MahoutTestCase;
@@ -24,7 +26,7 @@ import org.junit.Test;
 
 import java.util.List;
 
-public class TopItemQueueTest extends TasteTestCase {
+public class TopItemsQueueTest extends TasteTestCase {
 
   @Test
   public void topK() {
@@ -56,7 +58,7 @@ public class TopItemQueueTest extends Ta
 
 
   private static List<RecommendedItem> findTop(float[] ratings, int k) {
-    TopItemQueue queue = new TopItemQueue(k);
+    TopItemsQueue queue = new TopItemsQueue(k);
 
     for (int item = 0; item < ratings.length; item++) {
       MutableRecommendedItem top = queue.top();

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java?rev=1460541&r1=1460540&r2=1460541&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJobTest.java Mon Mar 25 08:16:22 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.MutableRecommendedItem;
 import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
 import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
 import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
@@ -537,8 +538,8 @@ public class RecommenderJobTest extends 
     Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable>.Context context =
         EasyMock.createMock(Reducer.Context.class);
 
-    context.write(EasyMock.eq(new VarLongWritable(123L)), recommendationsMatch(new GenericRecommendedItem(1L, 2.8f),
-        new GenericRecommendedItem(2L, 2.0f)));
+    context.write(EasyMock.eq(new VarLongWritable(123L)), recommendationsMatch(new MutableRecommendedItem(1L, 2.8f),
+        new MutableRecommendedItem(2L, 2.0f)));
 
     EasyMock.replay(context);
 
@@ -576,7 +577,7 @@ public class RecommenderJobTest extends 
     Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable>.Context context =
         EasyMock.createMock(Reducer.Context.class);
 
-    context.write(EasyMock.eq(new VarLongWritable(123L)), recommendationsMatch(new GenericRecommendedItem(1L, 2.8f)));
+    context.write(EasyMock.eq(new VarLongWritable(123L)), recommendationsMatch(new MutableRecommendedItem(1L, 2.8f)));
 
     EasyMock.replay(context);
 
@@ -613,7 +614,7 @@ public class RecommenderJobTest extends 
     Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable>.Context context =
       EasyMock.createMock(Reducer.Context.class);
 
-    context.write(EasyMock.eq(new VarLongWritable(123L)), recommendationsMatch(new GenericRecommendedItem(1L, 2.8f)));
+    context.write(EasyMock.eq(new VarLongWritable(123L)), recommendationsMatch(new MutableRecommendedItem(1L, 2.8f)));
 
     EasyMock.replay(context);