You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2009/12/11 20:04:24 UTC
svn commit: r889767 - in
/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop:
./ item/
Author: srowen
Date: Fri Dec 11 19:04:23 2009
New Revision: 889767
URL: http://svn.apache.org/viewvc?rev=889767&view=rev
Log:
Switch Hadoop job to use column-based computation instead of row-based
Added:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MapFilesMap.java
Modified:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ItemPrefWritable.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ItemPrefWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ItemPrefWritable.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ItemPrefWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ItemPrefWritable.java Fri Dec 11 19:04:23 2009
@@ -23,7 +23,7 @@
import java.io.DataOutput;
import java.io.IOException;
-/** A {@link Writable} encapsulating an item and a preference value. */
+/** A {@link Writable} encapsulating an item ID and a preference value. */
public final class ItemPrefWritable implements Writable {
private long itemID;
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MapFilesMap.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MapFilesMap.java?rev=889767&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MapFilesMap.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/MapFilesMap.java Fri Dec 11 19:04:23 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents a series of {@link MapFile}s, from which one might want to look up values
+ * based on keys. It just provides a simplified way to open them all up, and search
+ * from all of them.
+ */
+public final class MapFilesMap<K extends WritableComparable, V extends Writable> implements Closeable {
+
+ private static final PathFilter PARTS_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith("part-");
+ }
+ };
+
+ private final List<MapFile.Reader> readers;
+
+ public MapFilesMap(FileSystem fs,
+ Path parentDir,
+ Configuration conf) throws IOException {
+ readers = new ArrayList<MapFile.Reader>();
+ try {
+ for (FileStatus status : fs.listStatus(parentDir, PARTS_FILTER)) {
+ readers.add(new MapFile.Reader(fs, status.getPath().toString(), conf));
+ }
+ } catch (IOException ioe) {
+ close();
+ throw ioe;
+ }
+ if (readers.isEmpty()) {
+ throw new IllegalArgumentException("No MapFiles found in " + parentDir);
+ }
+ }
+
+ /**
+ * @return value reference if key is found, filled in with value data, or null if not found
+ */
+ public V get(K key, V value) throws IOException {
+ for (MapFile.Reader reader : readers) {
+ if (reader.get(key, value) != null) {
+ return value;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void close() {
+ for (MapFile.Reader reader : readers) {
+ try {
+ reader.close();
+ } catch (IOException ioe) {
+ // continue
+ }
+ }
+ }
+}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommendedItemsWritable.java Fri Dec 11 19:04:23 2009
@@ -97,7 +97,9 @@
result.append(':');
String valueString = String.valueOf(item.getValue());
// Is this rounding too crude?
- if (valueString.length() > 6) {
+ if (valueString.indexOf('E') >= 0) {
+ valueString = "0.0";
+ } else if (valueString.length() > 6) {
valueString = valueString.substring(0, 6);
}
result.append(valueString);
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java Fri Dec 11 19:04:23 2009
@@ -27,6 +27,19 @@
import java.io.IOException;
import java.util.regex.Pattern;
+/**
+ * <h1>Input</h1>
+ *
+ * <p>Intended for use with {@link org.apache.hadoop.mapred.TextInputFormat}; accepts
+ * line number / line pairs as {@link LongWritable}/{@link Text} pairs.</p>
+ *
+ * <p>Each line is assumed to be of the form <code>userID,itemID,preference</code>.</p>
+ *
+ * <h1>Output</h1>
+ *
+ * <p>Outputs the user ID as a {@link LongWritable} mapped to the item ID and preference
+ * as a {@link ItemPrefWritable}.</p>
+ */
public final class ToItemPrefsMapper
extends MapReduceBase
implements Mapper<LongWritable, Text, LongWritable, ItemPrefWritable> {
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java Fri Dec 11 19:04:23 2009
@@ -41,11 +41,11 @@
Reporter reporter) throws IOException {
String[] tokens = COMMA.split(value.toString());
long itemID = Long.parseLong(tokens[1]);
- int index = itemIDToIndex(itemID);
+ int index = idToIndex(itemID);
output.collect(new IntWritable(index), new LongWritable(itemID));
}
- static int itemIDToIndex(long itemID) {
+ static int idToIndex(long itemID) {
return (int) (itemID) ^ (int) (itemID >>> 32);
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Fri Dec 11 19:04:23 2009
@@ -24,6 +24,7 @@
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
@@ -83,7 +84,7 @@
ItemIDIndexReducer.class,
IntWritable.class,
LongWritable.class,
- SequenceFileOutputFormat.class);
+ MapFileOutputFormat.class);
JobClient.runJob(itemIDIndexConf);
JobConf toUserVectorConf = prepareJobConf(inputPath,
@@ -109,7 +110,7 @@
UserVectorToCooccurrenceReducer.class,
IntWritable.class,
SparseVector.class,
- SequenceFileOutputFormat.class);
+ MapFileOutputFormat.class);
JobClient.runJob(toCooccurrenceConf);
JobConf recommenderConf = prepareJobConf(userVectorPath,
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderMapper.java Fri Dec 11 19:04:23 2009
@@ -18,20 +18,17 @@
package org.apache.mahout.cf.taste.hadoop.item;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.cf.taste.hadoop.MapFilesMap;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
-import org.apache.mahout.cf.taste.impl.common.FastByIDMap;
import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.matrix.SparseVector;
@@ -53,40 +50,19 @@
static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
- private static final PathFilter IGNORABLE_FILES_FILTER = new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return !path.getName().startsWith("_logs");
- }
- };
-
- private FileSystem fs;
- private Path cooccurrencePath;
private int recommendationsPerUser;
- private FastByIDMap<Long> indexItemIDMap;
+ private MapFilesMap<IntWritable,LongWritable> indexItemIDMap;
+ private MapFilesMap<IntWritable,Vector> cooccurrenceColumnMap;
@Override
public void configure(JobConf jobConf) {
try {
- fs = FileSystem.get(jobConf);
- } catch (IOException ioe) {
- throw new IllegalStateException(ioe);
- }
- cooccurrencePath = new Path(jobConf.get(COOCCURRENCE_PATH)).makeQualified(fs);
- Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
- recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
- indexItemIDMap = new FastByIDMap<Long>();
- try {
- IntWritable index = new IntWritable();
- LongWritable itemID = new LongWritable();
- Configuration conf = new Configuration();
- for (FileStatus status : fs.listStatus(itemIDIndexPath, IGNORABLE_FILES_FILTER)) {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
- while (reader.next(index, itemID)) {
- indexItemIDMap.put(index.get(), itemID.get());
- }
- reader.close();
- }
+ FileSystem fs = FileSystem.get(jobConf);
+ Path cooccurrencePath = new Path(jobConf.get(COOCCURRENCE_PATH)).makeQualified(fs);
+ Path itemIDIndexPath = new Path(jobConf.get(ITEMID_INDEX_PATH)).makeQualified(fs);
+ recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
+ indexItemIDMap = new MapFilesMap<IntWritable,LongWritable>(fs, itemIDIndexPath, new Configuration());
+ cooccurrenceColumnMap = new MapFilesMap<IntWritable,Vector>(fs, cooccurrencePath, new Configuration());
} catch (IOException ioe) {
throw new IllegalStateException(ioe);
}
@@ -97,45 +73,45 @@
SparseVector userVector,
OutputCollector<LongWritable, RecommendedItemsWritable> output,
Reporter reporter) throws IOException {
- IntWritable indexWritable = new IntWritable();
- Vector cooccurrenceVector = new SparseVector(Integer.MAX_VALUE, 1000);
- Configuration conf = new Configuration();
+
+ Iterator<Vector.Element> userVectorIterator = userVector.iterateNonZero();
+ Vector recommendationVector = new SparseVector(Integer.MAX_VALUE, 1000);
+ Vector columnVector = new SparseVector(Integer.MAX_VALUE, 1000);
+ while (userVectorIterator.hasNext()) {
+ Vector.Element element = userVectorIterator.next();
+ int index = element.index();
+ double value = element.get();
+ cooccurrenceColumnMap.get(new IntWritable(index), columnVector);
+ columnVector.times(value).addTo(recommendationVector);
+ }
+
Queue<RecommendedItem> topItems =
- new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1, Collections.reverseOrder());
- for (FileStatus status : fs.listStatus(cooccurrencePath, IGNORABLE_FILES_FILTER)) {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf);
- while (reader.next(indexWritable, cooccurrenceVector)) {
- Long itemID = indexItemIDMap.get(indexWritable.get());
- if (itemID != null) {
- processOneRecommendation(userVector, itemID, cooccurrenceVector, topItems);
- } else {
- throw new IllegalStateException("Found index without item ID: " + indexWritable.get());
- }
+ new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1, Collections.reverseOrder());
+
+ Iterator<Vector.Element> recommendationVectorIterator = recommendationVector.iterateNonZero();
+ LongWritable itemID = new LongWritable();
+ while (recommendationVectorIterator.hasNext()) {
+ Vector.Element element = recommendationVectorIterator.next();
+ if (topItems.size() < recommendationsPerUser) {
+ indexItemIDMap.get(new IntWritable(element.index()), itemID);
+ topItems.add(new GenericRecommendedItem(itemID.get(), (float) element.get()));
+ } else if (element.get() > topItems.peek().getValue()) {
+ indexItemIDMap.get(new IntWritable(element.index()), itemID);
+ topItems.add(new GenericRecommendedItem(itemID.get(), (float) element.get()));
+ topItems.poll();
}
- reader.close();
}
+
List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>(topItems.size());
recommendations.addAll(topItems);
+ Collections.sort(recommendations);
output.collect(userID, new RecommendedItemsWritable(recommendations));
}
- private void processOneRecommendation(Vector userVector,
- long itemID,
- Vector cooccurrenceVector,
- Queue<RecommendedItem> topItems) {
- double totalWeight = 0.0;
- Iterator<Vector.Element> cooccurrences = cooccurrenceVector.iterateNonZero();
- while (cooccurrences.hasNext()) {
- Vector.Element cooccurrence = cooccurrences.next();
- totalWeight += cooccurrence.get();
- }
- double score = userVector.dot(cooccurrenceVector) / totalWeight;
- if (!Double.isNaN(score)) {
- topItems.add(new GenericRecommendedItem(itemID, (float) score));
- if (topItems.size() > recommendationsPerUser) {
- topItems.poll();
- }
- }
+ @Override
+ public void close() {
+ indexItemIDMap.close();
+ cooccurrenceColumnMap.close();
}
}
\ No newline at end of file
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java Fri Dec 11 19:04:23 2009
@@ -24,14 +24,37 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.cf.taste.hadoop.ItemPrefWritable;
import org.apache.mahout.matrix.SparseVector;
+import org.apache.mahout.matrix.Vector;
import java.io.IOException;
import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Queue;
+/**
+ * <h1>Input</h1>
+ *
+ * <p>Takes user IDs as {@link LongWritable} mapped to all associated item IDs
+ * and preference values, as {@link ItemPrefWritable}s.</p>
+ *
+ * <h1>Output</h1>
+ *
+ * <p>The same user ID mapped to a {@link SparseVector} representation of the
+ * same item IDs and preference values. Item IDs are used as vector indexes;
+ * they are hashed into ints to work as indexes with
+ * {@link ItemIDIndexMapper#idToIndex(long)}. The mapping is remembered for
+ * later with a combination of {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.</p>
+ *
+ * <p>The number of non-default elements actually retained in the user vector is capped
+ * at {@link #MAX_PREFS_CONSIDERED}.</p>
+ *
+ */
public final class ToUserVectorReducer
extends MapReduceBase
implements Reducer<LongWritable, ItemPrefWritable, LongWritable, SparseVector> {
+ public static final int MAX_PREFS_CONSIDERED = 50;
+
@Override
public void reduce(LongWritable userID,
Iterator<ItemPrefWritable> itemPrefs,
@@ -41,11 +64,40 @@
SparseVector userVector = new SparseVector(Integer.MAX_VALUE, 100);
while (itemPrefs.hasNext()) {
ItemPrefWritable itemPref = itemPrefs.next();
- int index = ItemIDIndexMapper.itemIDToIndex(itemPref.getItemID());
+ int index = ItemIDIndexMapper.idToIndex(itemPref.getItemID());
userVector.set(index, itemPref.getPrefValue());
}
+
+ if (userVector.getNumNondefaultElements() > MAX_PREFS_CONSIDERED) {
+ double cutoff = findTopNPrefsCutoff(MAX_PREFS_CONSIDERED, userVector);
+ SparseVector filteredVector = new SparseVector(Integer.MAX_VALUE, MAX_PREFS_CONSIDERED);
+ Iterator<Vector.Element> it = userVector.iterateNonZero();
+ while (it.hasNext()) {
+ Vector.Element element = it.next();
+ if (element.get() >= cutoff) {
+ filteredVector.set(element.index(), element.get());
+ }
+ }
+ userVector = filteredVector;
+ }
+
output.collect(userID, userVector);
}
}
+ private static double findTopNPrefsCutoff(int n, Vector userVector) {
+ Queue<Double> topPrefValues = new PriorityQueue<Double>(n + 1);
+ Iterator<Vector.Element> it = userVector.iterateNonZero();
+ while (it.hasNext()) {
+ double prefValue = it.next().get();
+ if (topPrefValues.size() < n) {
+ topPrefValues.add(prefValue);
+ } else if (prefValue > topPrefValues.peek()) {
+ topPrefValues.add(prefValue);
+ topPrefValues.poll();
+ }
+ }
+ return topPrefValues.peek();
+ }
+
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java?rev=889767&r1=889766&r2=889767&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java Fri Dec 11 19:04:23 2009
@@ -28,63 +28,30 @@
import java.io.IOException;
import java.util.Iterator;
-import java.util.PriorityQueue;
-import java.util.Queue;
public final class UserVectorToCooccurrenceMapper
extends MapReduceBase
implements Mapper<LongWritable, SparseVector, IntWritable, IntWritable> {
- private static final int MAX_PREFS_CONSIDERED = 50;
-
@Override
public void map(LongWritable userID,
SparseVector userVector,
OutputCollector<IntWritable, IntWritable> output,
Reporter reporter) throws IOException {
-
- double cutoff = userVector.size() <= MAX_PREFS_CONSIDERED ?
- Double.NEGATIVE_INFINITY : findTopNPrefsCutoff(MAX_PREFS_CONSIDERED, userVector);
-
Iterator<Vector.Element> it = userVector.iterateNonZero();
-
while (it.hasNext()) {
Vector.Element next1 = it.next();
- if (next1.get() >= cutoff) {
-
- int index1 = next1.index();
- Iterator<Vector.Element> it2 = userVector.iterateNonZero();
- IntWritable itemWritable1 = new IntWritable(index1);
-
- while (it2.hasNext()) {
- Vector.Element next2 = it2.next();
- if (next2.get() >= cutoff) {
-
- int index2 = next2.index();
- if (index1 != index2) {
- output.collect(itemWritable1, new IntWritable(index2));
- }
-
- }
+ int index1 = next1.index();
+ Iterator<Vector.Element> it2 = userVector.iterateNonZero();
+ IntWritable itemWritable1 = new IntWritable(index1);
+ while (it2.hasNext()) {
+ Vector.Element next2 = it2.next();
+ int index2 = next2.index();
+ if (index1 != index2) {
+ output.collect(itemWritable1, new IntWritable(index2));
}
-
- }
- }
- }
-
- private static double findTopNPrefsCutoff(int n, Vector userVector) {
- Queue<Double> topPrefValues = new PriorityQueue<Double>(n + 1);
- Iterator<Vector.Element> it = userVector.iterateNonZero();
- while (it.hasNext()) {
- double prefValue = it.next().get();
- if (topPrefValues.size() < n) {
- topPrefValues.add(prefValue);
- } else if (prefValue > topPrefValues.peek()) {
- topPrefValues.add(prefValue);
- topPrefValues.poll();
}
}
- return topPrefValues.peek();
}
}