You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/05/26 20:59:03 UTC
svn commit: r948538 [1/2] - in /mahout/trunk:
core/src/main/java/org/apache/mahout/cf/taste/hadoop/
core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/
core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/
core/src/main/java/org/apache/ma...
Author: srowen
Date: Wed May 26 18:59:02 2010
New Revision: 948538
URL: http://svn.apache.org/viewvc?rev=948538&view=rev
Log:
Recommender and related jobs now exclusively use new Hadoop 0.20.x+ APIs
Removed:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IndexIndexWritable.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/AbstractDistributedItemSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedEuclideanDistanceSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedLogLikelihoodSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedTanimotoCoefficientSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredCosineSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneDiffsToAveragesReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOnePrefsToDiffsReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarityTestCase.java
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java
mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToEntityPrefsMapper.java Wed May 26 18:59:02 2010
@@ -17,20 +17,17 @@
package org.apache.mahout.cf.taste.hadoop;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
import org.apache.mahout.math.VarLongWritable;
import java.io.IOException;
import java.util.regex.Pattern;
-public abstract class ToEntityPrefsMapper extends MapReduceBase implements
+public abstract class ToEntityPrefsMapper extends
Mapper<LongWritable,Text, VarLongWritable,VarLongWritable> {
public static final String TRANSPOSE_USER_ITEM = "transposeUserItem";
@@ -46,7 +43,8 @@ public abstract class ToEntityPrefsMappe
}
@Override
- public void configure(JobConf jobConf) {
+ public void setup(Context context) {
+ Configuration jobConf = context.getConfiguration();
booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
transpose = jobConf.getBoolean(TRANSPOSE_USER_ITEM, false);
}
@@ -54,8 +52,7 @@ public abstract class ToEntityPrefsMappe
@Override
public void map(LongWritable key,
Text value,
- OutputCollector<VarLongWritable,VarLongWritable> output,
- Reporter reporter) throws IOException {
+ Context context) throws IOException, InterruptedException {
String[] tokens = ToEntityPrefsMapper.DELIMITER.split(value.toString());
long userID = Long.parseLong(tokens[0]);
long itemID = Long.parseLong(tokens[1]);
@@ -68,10 +65,10 @@ public abstract class ToEntityPrefsMappe
itemID = temp;
}
if (booleanData) {
- output.collect(new VarLongWritable(userID), new VarLongWritable(itemID));
+ context.write(new VarLongWritable(userID), new VarLongWritable(itemID));
} else {
float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) : 1.0f;
- output.collect(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
+ context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/ToItemPrefsMapper.java Wed May 26 18:59:02 2010
@@ -23,7 +23,8 @@ import org.apache.hadoop.io.Text;
* <h1>Input</h1>
*
* <p>
- * Intended for use with {@link org.apache.hadoop.mapred.TextInputFormat}; accepts line number / line pairs as
+ * Intended for use with {@link org.apache.hadoop.mapreduce.lib.input.TextInputFormat};
+ * accepts line number / line pairs as
* {@link org.apache.mahout.math.VarLongWritable}/{@link Text} pairs.
* </p>
*
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java Wed May 26 18:59:02 2010
@@ -25,16 +25,13 @@ import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
@@ -45,7 +42,7 @@ import org.apache.mahout.math.VectorWrit
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.map.OpenIntLongHashMap;
-public final class AggregateAndRecommendReducer extends MapReduceBase implements
+public final class AggregateAndRecommendReducer extends
Reducer<VarLongWritable,VectorWritable,VarLongWritable,RecommendedItemsWritable> {
static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
@@ -62,7 +59,8 @@ public final class AggregateAndRecommend
private OpenIntLongHashMap indexItemIDMap;
@Override
- public void configure(JobConf jobConf) {
+ public void setup(Context context) {
+ Configuration jobConf = context.getConfiguration();
recommendationsPerUser = jobConf.getInt(RECOMMENDATIONS_PER_USER, 10);
try {
FileSystem fs = FileSystem.get(jobConf);
@@ -86,15 +84,17 @@ public final class AggregateAndRecommend
@Override
public void reduce(VarLongWritable key,
- Iterator<VectorWritable> values,
- OutputCollector<VarLongWritable,RecommendedItemsWritable> output,
- Reporter reporter) throws IOException {
- if (!values.hasNext()) {
- return;
+ Iterable<VectorWritable> values,
+ Context context) throws IOException, InterruptedException {
+
+ Vector recommendationVector = null;
+ for (VectorWritable vectorWritable : values) {
+ recommendationVector = recommendationVector == null ?
+ vectorWritable.get() :
+ recommendationVector.plus(vectorWritable.get());
}
- Vector recommendationVector = values.next().get();
- while (values.hasNext()) {
- recommendationVector = recommendationVector.plus(values.next().get());
+ if (recommendationVector == null) {
+ return;
}
Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(recommendationsPerUser + 1,
@@ -120,7 +120,7 @@ public final class AggregateAndRecommend
List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>(topItems.size());
recommendations.addAll(topItems);
Collections.sort(recommendations, ByValueRecommendedItemComparator.getInstance());
- output.collect(key, new RecommendedItemsWritable(recommendations));
+ context.write(key, new RecommendedItemsWritable(recommendations));
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateCombiner.java Wed May 26 18:59:02 2010
@@ -18,32 +18,28 @@
package org.apache.mahout.cf.taste.hadoop.item;
import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
-public final class AggregateCombiner extends MapReduceBase implements
+public final class AggregateCombiner extends
Reducer<VarLongWritable,VectorWritable,VarLongWritable,VectorWritable> {
@Override
public void reduce(VarLongWritable key,
- Iterator<VectorWritable> values,
- OutputCollector<VarLongWritable,VectorWritable> output,
- Reporter reporter) throws IOException {
- if (values.hasNext()) {
- Vector partial = values.next().get();
- while (values.hasNext()) {
- partial = partial.plus(values.next().get());
- }
+ Iterable<VectorWritable> values,
+ Context context) throws IOException, InterruptedException {
+
+ Vector partial = null;
+ for (VectorWritable vectorWritable : values) {
+ partial = partial == null ? vectorWritable.get() : partial.plus(vectorWritable.get());
+ }
+ if (partial != null) {
VectorWritable vw = new VectorWritable(partial);
vw.setWritesLaxPrecision(true);
- output.collect(key, vw);
+ context.write(key, vw);
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/CooccurrenceColumnWrapperMapper.java Wed May 26 18:59:02 2010
@@ -19,22 +19,18 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VectorWritable;
-public final class CooccurrenceColumnWrapperMapper extends MapReduceBase implements
- Mapper<VarIntWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
+public final class CooccurrenceColumnWrapperMapper extends
+ Mapper<VarIntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> {
@Override
public void map(VarIntWritable key,
VectorWritable value,
- OutputCollector<VarIntWritable,VectorOrPrefWritable> output,
- Reporter reporter) throws IOException {
- output.collect(key, new VectorOrPrefWritable(value.get()));
+ Context context) throws IOException, InterruptedException {
+ context.write(key, new VectorOrPrefWritable(value.get()));
}
}
\ No newline at end of file
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java Wed May 26 18:59:02 2010
@@ -20,18 +20,15 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
import java.util.regex.Pattern;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
-public final class ItemIDIndexMapper extends MapReduceBase implements
+public final class ItemIDIndexMapper extends
Mapper<LongWritable,Text, VarIntWritable, VarLongWritable> {
private static final Pattern COMMA = Pattern.compile(",");
@@ -39,19 +36,19 @@ public final class ItemIDIndexMapper ext
private boolean transpose;
@Override
- public void configure(JobConf jobConf) {
+ public void setup(Context context) {
+ Configuration jobConf = context.getConfiguration();
transpose = jobConf.getBoolean(ToEntityPrefsMapper.TRANSPOSE_USER_ITEM, false);
}
@Override
public void map(LongWritable key,
Text value,
- OutputCollector<VarIntWritable,VarLongWritable> output,
- Reporter reporter) throws IOException {
+ Context context) throws IOException, InterruptedException {
String[] tokens = ItemIDIndexMapper.COMMA.split(value.toString());
long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);
int index = idToIndex(itemID);
- output.collect(new VarIntWritable(index), new VarLongWritable(itemID));
+ context.write(new VarIntWritable(index), new VarLongWritable(itemID));
}
static int idToIndex(long itemID) {
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java Wed May 26 18:59:02 2010
@@ -18,32 +18,27 @@
package org.apache.mahout.cf.taste.hadoop.item;
import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
-public final class ItemIDIndexReducer extends MapReduceBase implements
+public final class ItemIDIndexReducer extends
Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> {
@Override
public void reduce(VarIntWritable index,
- Iterator<VarLongWritable> possibleItemIDs,
- OutputCollector<VarIntWritable,VarLongWritable> output,
- Reporter reporter) throws IOException {
- if (possibleItemIDs.hasNext()) {
- long minimumItemID = Long.MAX_VALUE;
- while (possibleItemIDs.hasNext()) {
- long itemID = possibleItemIDs.next().get();
- if (itemID < minimumItemID) {
- minimumItemID = itemID;
- }
+ Iterable<VarLongWritable> possibleItemIDs,
+ Context context) throws IOException, InterruptedException {
+ long minimumItemID = Long.MAX_VALUE;
+ for (VarLongWritable varLongWritable : possibleItemIDs) {
+ long itemID = varLongWritable.get();
+ if (itemID < minimumItemID) {
+ minimumItemID = itemID;
}
- output.collect(index, new VarLongWritable(minimumItemID));
+ }
+ if (minimumItemID != Long.MAX_VALUE) {
+ context.write(index, new VarLongWritable(minimumItemID));
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java Wed May 26 18:59:02 2010
@@ -20,24 +20,20 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.Vector;
-public final class PartialMultiplyMapper extends MapReduceBase implements
+public final class PartialMultiplyMapper extends
Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,VectorWritable> {
@Override
public void map(VarIntWritable key,
VectorAndPrefsWritable vectorAndPrefsWritable,
- OutputCollector<VarLongWritable, VectorWritable> output,
- Reporter reporter) throws IOException {
+ Context context) throws IOException, InterruptedException {
int itemIndex = key.get();
@@ -55,7 +51,7 @@ public final class PartialMultiplyMapper
excludeWritable.setWritesLaxPrecision(true);
for (long userID : userIDs) {
userIDWritable.set(userID);
- output.collect(userIDWritable, excludeWritable);
+ context.write(userIDWritable, excludeWritable);
}
VectorWritable vectorWritable = new VectorWritable();
@@ -69,7 +65,7 @@ public final class PartialMultiplyMapper
cooccurrenceColumn.times(prefValue);
userIDWritable.set(userID);
vectorWritable.set(partialProduct);
- output.collect(userIDWritable, vectorWritable);
+ context.write(userIDWritable, vectorWritable);
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Wed May 26 18:59:02 2010
@@ -25,16 +25,14 @@ import java.util.regex.Pattern;
import org.apache.commons.cli2.Option;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.MultipleInputs;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
@@ -68,7 +66,7 @@ public final class RecommenderJob extend
public static final String BOOLEAN_DATA = "booleanData";
@Override
- public int run(String[] args) throws IOException {
+ public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Option numReccomendationsOpt = AbstractJob.buildOption("numRecommendations", "n",
"Number of recommendations per user", "10");
@@ -84,87 +82,97 @@ public final class RecommenderJob extend
}
Configuration originalConf = getConf();
- String inputPath = originalConf.get("mapred.input.dir");
- String outputPath = originalConf.get("mapred.output.dir");
- String tempDirPath = parsedArgs.get("--tempDir");
+ Path inputPath = new Path(originalConf.get("mapred.input.dir"));
+ Path outputPath = new Path(originalConf.get("mapred.output.dir"));
+ Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
int recommendationsPerUser = Integer.parseInt(parsedArgs.get("--numRecommendations"));
String usersFile = parsedArgs.get("--usersFile");
boolean booleanData = Boolean.valueOf(parsedArgs.get("--booleanData"));
- String userVectorPath = tempDirPath + "/userVectors";
- String itemIDIndexPath = tempDirPath + "/itemIDIndex";
- String cooccurrencePath = tempDirPath + "/cooccurrence";
- String partialMultiplyPath = tempDirPath + "/partialMultiply";
+ Path userVectorPath = new Path(tempDirPath, "userVectors");
+ Path itemIDIndexPath = new Path(tempDirPath, "itemIDIndex");
+ Path cooccurrencePath = new Path(tempDirPath, "cooccurrence");
+ Path prePartialMultiplyPath1 = new Path(tempDirPath, "prePartialMultiply1");
+ Path prePartialMultiplyPath2 = new Path(tempDirPath, "prePartialMultiply2");
+ Path partialMultiplyPath = new Path(tempDirPath, "partialMultiply");
AtomicInteger currentPhase = new AtomicInteger();
- JobConf itemIDIndexConf = prepareJobConf(
- inputPath, itemIDIndexPath, TextInputFormat.class,
- ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,
- ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,
- SequenceFileOutputFormat.class);
- itemIDIndexConf.setClass("mapred.combiner.class", ItemIDIndexReducer.class, Reducer.class);
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- JobClient.runJob(itemIDIndexConf);
+ Job itemIDIndex = prepareJob(
+ inputPath, itemIDIndexPath, TextInputFormat.class,
+ ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,
+ ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,
+ SequenceFileOutputFormat.class);
+ itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);
+ itemIDIndex.waitForCompletion(true);
}
-
- JobConf toUserVectorConf = prepareJobConf(
- inputPath, userVectorPath, TextInputFormat.class,
- ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
- ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class,
- SequenceFileOutputFormat.class);
- toUserVectorConf.setBoolean(BOOLEAN_DATA, booleanData);
+
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- JobClient.runJob(toUserVectorConf);
+ Job toUserVector = prepareJob(
+ inputPath, userVectorPath, TextInputFormat.class,
+ ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
+ ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class,
+ SequenceFileOutputFormat.class);
+ toUserVector.getConfiguration().setBoolean(BOOLEAN_DATA, booleanData);
+ toUserVector.waitForCompletion(true);
}
- JobConf toCooccurrenceConf = prepareJobConf(
- userVectorPath, cooccurrencePath, SequenceFileInputFormat.class,
- UserVectorToCooccurrenceMapper.class, VarIntWritable.class, VarIntWritable.class,
- UserVectorToCooccurrenceReducer.class, VarIntWritable.class, VectorWritable.class,
- SequenceFileOutputFormat.class);
- setIOSort(toCooccurrenceConf);
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- JobClient.runJob(toCooccurrenceConf);
+ Job toCooccurrence = prepareJob(
+ userVectorPath, cooccurrencePath, SequenceFileInputFormat.class,
+ UserVectorToCooccurrenceMapper.class, VarIntWritable.class, VarIntWritable.class,
+ UserVectorToCooccurrenceReducer.class, VarIntWritable.class, VectorWritable.class,
+ SequenceFileOutputFormat.class);
+ setIOSort(toCooccurrence);
+ toCooccurrence.waitForCompletion(true);
}
- JobConf partialMultiplyConf = prepareJobConf(
- cooccurrencePath, partialMultiplyPath, SequenceFileInputFormat.class,
- CooccurrenceColumnWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
- ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
- SequenceFileOutputFormat.class);
- MultipleInputs.addInputPath(
- partialMultiplyConf,
- new Path(cooccurrencePath).makeQualified(FileSystem.get(partialMultiplyConf)),
- SequenceFileInputFormat.class, CooccurrenceColumnWrapperMapper.class);
- MultipleInputs.addInputPath(
- partialMultiplyConf,
- new Path(userVectorPath).makeQualified(FileSystem.get(partialMultiplyConf)),
- SequenceFileInputFormat.class, UserVectorSplitterMapper.class);
- if (usersFile != null) {
- partialMultiplyConf.set(UserVectorSplitterMapper.USERS_FILE, usersFile);
- }
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- JobClient.runJob(partialMultiplyConf);
+ Job prePartialMultiply1 = prepareJob(
+ cooccurrencePath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
+ CooccurrenceColumnWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
+ Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
+ SequenceFileOutputFormat.class);
+ prePartialMultiply1.waitForCompletion(true);
+
+ Job prePartialMultiply2 = prepareJob(
+ userVectorPath, prePartialMultiplyPath2, SequenceFileInputFormat.class,
+ UserVectorSplitterMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
+ Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
+ SequenceFileOutputFormat.class);
+ if (usersFile != null) {
+ prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
+ }
+ prePartialMultiply2.waitForCompletion(true);
+
+ Job partialMultiply = prepareJob(
+ new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath, SequenceFileInputFormat.class,
+ Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
+ ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
+ SequenceFileOutputFormat.class);
+ partialMultiply.waitForCompletion(true);
}
- JobConf aggregateAndRecommendConf = prepareJobConf(
- partialMultiplyPath, outputPath, SequenceFileInputFormat.class,
- PartialMultiplyMapper.class, VarLongWritable.class, VectorWritable.class,
- AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
- TextOutputFormat.class);
- setIOSort(aggregateAndRecommendConf);
- aggregateAndRecommendConf.setClass("mapred.combiner.class", AggregateCombiner.class, Reducer.class);
- aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath);
- aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- JobClient.runJob(aggregateAndRecommendConf);
+ Job aggregateAndRecommend = prepareJob(
+ partialMultiplyPath, outputPath, SequenceFileInputFormat.class,
+ PartialMultiplyMapper.class, VarLongWritable.class, VectorWritable.class,
+ AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
+ TextOutputFormat.class);
+ Configuration jobConf = aggregateAndRecommend.getConfiguration();
+ setIOSort(aggregateAndRecommend);
+ aggregateAndRecommend.setCombinerClass(AggregateCombiner.class);
+ jobConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, itemIDIndexPath.toString());
+ jobConf.setInt(AggregateAndRecommendReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
+ aggregateAndRecommend.waitForCompletion(true);
}
return 0;
}
- private static void setIOSort(JobConf conf) {
+ private static void setIOSort(Job job) {
+ Configuration conf = job.getConfiguration();
conf.setInt("io.sort.factor", 100);
int assumedHeapSize = 512;
String javaOpts = conf.get("mapred.child.java.opts");
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorReducer.java Wed May 26 18:59:02 2010
@@ -18,12 +18,8 @@
package org.apache.mahout.cf.taste.hadoop.item;
import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarLongWritable;
@@ -47,20 +43,15 @@ import org.apache.mahout.math.VectorWrit
* {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.
* </p>
*/
-public final class ToUserVectorReducer extends MapReduceBase implements
+public final class ToUserVectorReducer extends
Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
@Override
public void reduce(VarLongWritable userID,
- Iterator<VarLongWritable> itemPrefs,
- OutputCollector<VarLongWritable,VectorWritable> output,
- Reporter reporter) throws IOException {
- if (!itemPrefs.hasNext()) {
- return;
- }
+ Iterable<VarLongWritable> itemPrefs,
+ Context context) throws IOException, InterruptedException {
Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
- while (itemPrefs.hasNext()) {
- VarLongWritable itemPref = itemPrefs.next();
+ for (VarLongWritable itemPref : itemPrefs) {
int index = ItemIDIndexMapper.idToIndex(itemPref.get());
float value;
if (itemPref instanceof EntityPrefWritable) {
@@ -73,7 +64,7 @@ public final class ToUserVectorReducer e
VectorWritable vw = new VectorWritable(userVector);
vw.setWritesLaxPrecision(true);
- output.collect(userID, vw);
+ context.write(userID, vw);
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java Wed May 26 18:59:02 2010
@@ -19,30 +19,24 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.Vector;
-public final class ToVectorAndPrefReducer extends MapReduceBase implements
+public final class ToVectorAndPrefReducer extends
Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> {
@Override
public void reduce(VarIntWritable key,
- Iterator<VectorOrPrefWritable> values,
- OutputCollector<VarIntWritable,VectorAndPrefsWritable> output,
- Reporter reporter) throws IOException {
+ Iterable<VectorOrPrefWritable> values,
+ Context context) throws IOException, InterruptedException {
List<Long> userIDs = new ArrayList<Long>();
List<Float> prefValues = new ArrayList<Float>();
Vector cooccurrenceColumn = null;
- while (values.hasNext()) {
- VectorOrPrefWritable value = values.next();
+ for (VectorOrPrefWritable value : values) {
if (value.getVector() == null) {
// Then this is a user-pref value
userIDs.add(value.getUserID());
@@ -61,7 +55,7 @@ public final class ToVectorAndPrefReduce
}
VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(cooccurrenceColumn, userIDs, prefValues);
- output.collect(key, vectorAndPrefs);
+ context.write(key, vectorAndPrefs);
}
}
\ No newline at end of file
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java Wed May 26 18:59:02 2010
@@ -21,14 +21,11 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.PriorityQueue;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.impl.common.FastIDSet;
import org.apache.mahout.common.FileLineIterable;
import org.apache.mahout.math.VarIntWritable;
@@ -36,7 +33,7 @@ import org.apache.mahout.math.VarLongWri
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
-public final class UserVectorSplitterMapper extends MapReduceBase implements
+public final class UserVectorSplitterMapper extends
Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
private static final int MAX_PREFS_CONSIDERED = 10;
@@ -45,7 +42,8 @@ public final class UserVectorSplitterMap
private FastIDSet usersToRecommendFor;
@Override
- public void configure(JobConf jobConf) {
+ public void setup(Context context) {
+ Configuration jobConf = context.getConfiguration();
try {
FileSystem fs = FileSystem.get(jobConf);
String usersFilePathString = jobConf.get(USERS_FILE);
@@ -67,8 +65,7 @@ public final class UserVectorSplitterMap
@Override
public void map(VarLongWritable key,
VectorWritable value,
- OutputCollector<VarIntWritable,VectorOrPrefWritable> output,
- Reporter reporter) throws IOException {
+ Context context) throws IOException, InterruptedException {
long userID = key.get();
if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
return;
@@ -81,7 +78,7 @@ public final class UserVectorSplitterMap
Vector.Element e = it.next();
itemIndexWritable.set(e.index());
vectorOrPref.set(userID, (float) e.get());
- output.collect(itemIndexWritable, vectorOrPref);
+ context.write(itemIndexWritable, vectorOrPref);
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceMapper.java Wed May 26 18:59:02 2010
@@ -22,17 +22,14 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.PriorityQueue;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.map.OpenIntIntHashMap;
-public final class UserVectorToCooccurrenceMapper extends MapReduceBase implements
+public final class UserVectorToCooccurrenceMapper extends
Mapper<VarLongWritable,VectorWritable,VarIntWritable,VarIntWritable> {
private static final int MAX_PREFS_CONSIDERED = 100;
@@ -46,8 +43,7 @@ public final class UserVectorToCooccurre
@Override
public void map(VarLongWritable userID,
VectorWritable userVectorWritable,
- OutputCollector<VarIntWritable,VarIntWritable> output,
- Reporter reporter) throws IOException {
+ Context context) throws IOException, InterruptedException {
Vector userVector = userVectorWritable.get();
countSeen(userVector);
@@ -56,7 +52,7 @@ public final class UserVectorToCooccurre
userVector = maybePruneUserVector(userVector);
int newSize = userVector.getNumNondefaultElements();
if (newSize < originalSize) {
- reporter.incrCounter(Counters.USER_PREFS_SKIPPED, originalSize - newSize);
+ context.getCounter(Counters.USER_PREFS_SKIPPED).increment(originalSize - newSize);
}
Iterator<Vector.Element> it = userVector.iterateNonZero();
@@ -69,7 +65,7 @@ public final class UserVectorToCooccurre
while (it2.hasNext()) {
int index2 = it2.next().index();
itemIndex2.set(index2);
- output.collect(itemIndex1, itemIndex2);
+ context.write(itemIndex1, itemIndex2);
}
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorToCooccurrenceReducer.java Wed May 26 18:59:02 2010
@@ -18,33 +18,28 @@
package org.apache.mahout.cf.taste.hadoop.item;
import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
-public final class UserVectorToCooccurrenceReducer extends MapReduceBase implements
+public final class UserVectorToCooccurrenceReducer extends
Reducer<VarIntWritable,VarIntWritable,VarIntWritable,VectorWritable> {
@Override
public void reduce(VarIntWritable itemIndex1,
- Iterator<VarIntWritable> itemIndex2s,
- OutputCollector<VarIntWritable,VectorWritable> output,
- Reporter reporter) throws IOException {
+ Iterable<VarIntWritable> itemIndex2s,
+ Context context) throws IOException, InterruptedException {
Vector cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
- while (itemIndex2s.hasNext()) {
- int itemIndex2 = itemIndex2s.next().get();
+ for (VarIntWritable varIntWritable : itemIndex2s) {
+ int itemIndex2 = varIntWritable.get();
cooccurrenceRow.set(itemIndex2, cooccurrenceRow.get(itemIndex2) + 1.0);
}
VectorWritable vw = new VectorWritable(cooccurrenceRow);
vw.setWritesLaxPrecision(true);
- output.collect(itemIndex1, vw);
+ context.write(itemIndex1, vw);
}
}
\ No newline at end of file
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java Wed May 26 18:59:02 2010
@@ -22,13 +22,14 @@ import java.util.Map;
import org.apache.commons.cli2.Option;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
@@ -103,7 +104,7 @@ import org.apache.mahout.math.VarLongWri
public final class RecommenderJob extends AbstractJob {
@Override
- public int run(String[] args) throws IOException {
+ public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Option recommendClassOpt = AbstractJob.buildOption("recommenderClassName", "r",
"Name of recommender class to instantiate");
@@ -119,26 +120,35 @@ public final class RecommenderJob extend
}
Configuration originalConf = getConf();
- String inputFile = originalConf.get("mapred.input.dir");
- String outputPath = originalConf.get("mapred.output.dir");
- String usersFile = parsedArgs.get("--usersFile");
- if (usersFile == null) {
+ Path inputFile = new Path(originalConf.get("mapred.input.dir"));
+ Path outputPath = new Path(originalConf.get("mapred.output.dir"));
+ Path usersFile;
+ if (parsedArgs.get("--usersFile") == null) {
usersFile = inputFile;
+ } else {
+ usersFile = new Path(parsedArgs.get("--usersFile"));
}
String recommendClassName = parsedArgs.get("--recommenderClassName");
int recommendationsPerUser = Integer.parseInt(parsedArgs.get("--numRecommendations"));
- JobConf jobConf = prepareJobConf(usersFile, outputPath, TextInputFormat.class,
- UserIDsMapper.class, VarLongWritable.class, NullWritable.class, RecommenderReducer.class,
- VarLongWritable.class, RecommendedItemsWritable.class, TextOutputFormat.class);
-
+ Job job = prepareJob(usersFile,
+ outputPath,
+ TextInputFormat.class,
+ UserIDsMapper.class,
+ VarLongWritable.class,
+ NullWritable.class,
+ RecommenderReducer.class,
+ VarLongWritable.class,
+ RecommendedItemsWritable.class,
+ TextOutputFormat.class);
+ FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+ Configuration jobConf = job.getConfiguration();
jobConf.set(RecommenderReducer.RECOMMENDER_CLASS_NAME, recommendClassName);
jobConf.setInt(RecommenderReducer.RECOMMENDATIONS_PER_USER, recommendationsPerUser);
- jobConf.set(RecommenderReducer.DATA_MODEL_FILE, inputFile);
- jobConf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
-
- JobClient.runJob(jobConf);
+ jobConf.set(RecommenderReducer.DATA_MODEL_FILE, inputFile.toString());
+
+ job.waitForCompletion(true);
return 0;
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderReducer.java Wed May 26 18:59:02 2010
@@ -24,14 +24,11 @@ import java.lang.reflect.InvocationTarge
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;
@@ -49,7 +46,7 @@ import org.apache.mahout.math.VarLongWri
*
* @see RecommenderJob
*/
-public final class RecommenderReducer extends MapReduceBase implements
+public final class RecommenderReducer extends
Reducer<VarLongWritable,NullWritable,VarLongWritable,RecommendedItemsWritable> {
static final String RECOMMENDER_CLASS_NAME = "recommenderClassName";
@@ -60,7 +57,8 @@ public final class RecommenderReducer ex
private int recommendationsPerUser;
@Override
- public void configure(JobConf jobConf) {
+ public void setup(Context context) {
+ Configuration jobConf = context.getConfiguration();
String dataModelFile = jobConf.get(DATA_MODEL_FILE);
String recommenderClassName = jobConf.get(RECOMMENDER_CLASS_NAME);
FileDataModel fileDataModel;
@@ -95,9 +93,8 @@ public final class RecommenderReducer ex
@Override
public void reduce(VarLongWritable key,
- Iterator<NullWritable> values,
- OutputCollector<VarLongWritable,RecommendedItemsWritable> output,
- Reporter reporter) throws IOException {
+ Iterable<NullWritable> values,
+ Context context) throws IOException, InterruptedException {
long userID = key.get();
List<RecommendedItem> recommendedItems;
try {
@@ -112,8 +109,8 @@ public final class RecommenderReducer ex
}
}
RecommendedItemsWritable writable = new RecommendedItemsWritable(recommendedItems);
- output.collect(key, writable);
- reporter.getCounter(ReducerMetrics.USERS_PROCESSED).increment(1L);
- reporter.getCounter(ReducerMetrics.RECOMMENDATIONS_MADE).increment(recommendedItems.size());
+ context.write(key, writable);
+ context.getCounter(ReducerMetrics.USERS_PROCESSED).increment(1L);
+ context.getCounter(ReducerMetrics.RECOMMENDATIONS_MADE).increment(recommendedItems.size());
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/UserIDsMapper.java Wed May 26 18:59:02 2010
@@ -22,27 +22,23 @@ import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarLongWritable;
/**
* Extracts and emits all user IDs from the users file, or input file.
*/
-public final class UserIDsMapper extends MapReduceBase implements
+public final class UserIDsMapper extends
Mapper<LongWritable,Text, VarLongWritable,NullWritable> {
@Override
public void map(LongWritable key,
Text value,
- OutputCollector<VarLongWritable,NullWritable> output,
- Reporter reporter) throws IOException {
+ Context context) throws IOException, InterruptedException {
String line = value.toString();
int comma = line.indexOf(',');
long userID = comma >= 0 ? Long.parseLong(line.substring(0, comma)) : Long.parseLong(line);
- output.collect(new VarLongWritable(userID), NullWritable.get());
+ context.write(new VarLongWritable(userID), NullWritable.get());
}
}
\ No newline at end of file
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/AbstractDistributedItemSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/AbstractDistributedItemSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/AbstractDistributedItemSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/AbstractDistributedItemSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
package org.apache.mahout.cf.taste.hadoop.similarity;
-import java.util.Iterator;
-
/**
* abstract base class for all implementations of {@link DistributedItemSimilarity} that does not give a
* weight to item vectors and only ensures that the result is within [-1,1]
@@ -27,9 +25,10 @@ public abstract class AbstractDistribute
implements DistributedItemSimilarity {
@Override
- public final double similarity(Iterator<CoRating> coratings,
- double weightOfItemVectorX, double weightOfItemVectorY,
- int numberOfUsers) {
+ public final double similarity(Iterable<CoRating> coratings,
+ double weightOfItemVectorX,
+ double weightOfItemVectorY,
+ int numberOfUsers) {
double result = doComputeResult(coratings, weightOfItemVectorX, weightOfItemVectorY, numberOfUsers);
@@ -46,11 +45,11 @@ public abstract class AbstractDistribute
* when they need a weight
*/
@Override
- public double weightOfItemVector(Iterator<Float> prefValues) {
+ public double weightOfItemVector(Iterable<Float> prefValues) {
return Double.NaN;
}
- protected abstract double doComputeResult(Iterator<CoRating> coratings,
+ protected abstract double doComputeResult(Iterable<CoRating> coratings,
double weightOfItemVectorX, double weightOfItemVectorY,
int numberOfUsers);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedEuclideanDistanceSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedEuclideanDistanceSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedEuclideanDistanceSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedEuclideanDistanceSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
package org.apache.mahout.cf.taste.hadoop.similarity;
-import java.util.Iterator;
-
import org.apache.mahout.cf.taste.impl.similarity.EuclideanDistanceSimilarity;
/**
@@ -27,15 +25,15 @@ import org.apache.mahout.cf.taste.impl.s
public class DistributedEuclideanDistanceSimilarity extends AbstractDistributedItemSimilarity {
@Override
- protected double doComputeResult(Iterator<CoRating> coratings,
- double weightOfItemVectorX, double weightOfItemVectorY,
- int numberOfUsers) {
+ protected double doComputeResult(Iterable<CoRating> coratings,
+ double weightOfItemVectorX,
+ double weightOfItemVectorY,
+ int numberOfUsers) {
double n=0;
double sumXYdiff2 = 0;
- while (coratings.hasNext()) {
- CoRating coRating = coratings.next();
+ for (CoRating coRating : coratings) {
double diff = coRating.getPrefValueX() - coRating.getPrefValueY();
sumXYdiff2 += diff * diff;
n++;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
package org.apache.mahout.cf.taste.hadoop.similarity;
-import java.util.Iterator;
-
/**
* Modelling the pairwise similarity computation of items in a distributed manner
*/
@@ -30,7 +28,7 @@ public interface DistributedItemSimilari
* @param prefValues
* @return
*/
- double weightOfItemVector(Iterator<Float> prefValues);
+ double weightOfItemVector(Iterable<Float> prefValues);
/**
* compute the similarity for a pair of item-vectors
@@ -41,7 +39,7 @@ public interface DistributedItemSimilari
* @param numberOfUsers the overall number of users
* @return
*/
- double similarity(Iterator<CoRating> coratings,
+ double similarity(Iterable<CoRating> coratings,
double weightOfItemVectorX,
double weightOfItemVectorY,
int numberOfUsers);
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedLogLikelihoodSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedLogLikelihoodSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedLogLikelihoodSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedLogLikelihoodSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
package org.apache.mahout.cf.taste.hadoop.similarity;
-import java.util.Iterator;
-
import org.apache.mahout.cf.taste.impl.similarity.LogLikelihoodSimilarity;
/**
@@ -27,13 +25,13 @@ import org.apache.mahout.cf.taste.impl.s
public class DistributedLogLikelihoodSimilarity extends AbstractDistributedItemSimilarity {
@Override
- protected double doComputeResult(Iterator<CoRating> coratings,
- double weightOfItemVectorX, double weightOfItemVectorY,
- int numberOfUsers) {
+ protected double doComputeResult(Iterable<CoRating> coratings,
+ double weightOfItemVectorX,
+ double weightOfItemVectorY,
+ int numberOfUsers) {
int preferringXandY = 0;
- while (coratings.hasNext()) {
- coratings.next();
+ for (CoRating corating : coratings) {
preferringXandY++;
}
@@ -53,10 +51,9 @@ public class DistributedLogLikelihoodSim
}
@Override
- public double weightOfItemVector(Iterator<Float> prefValues) {
- double nonZeroEntries = 0;
- while (prefValues.hasNext()) {
- prefValues.next();
+ public double weightOfItemVector(Iterable<Float> prefValues) {
+ int nonZeroEntries = 0;
+ for (Float prefValue : prefValues) {
nonZeroEntries++;
}
return nonZeroEntries;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedPearsonCorrelationSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
package org.apache.mahout.cf.taste.hadoop.similarity;
-import java.util.Iterator;
-
import org.apache.mahout.cf.taste.impl.similarity.PearsonCorrelationSimilarity;
/**
@@ -27,9 +25,10 @@ import org.apache.mahout.cf.taste.impl.s
public class DistributedPearsonCorrelationSimilarity extends AbstractDistributedItemSimilarity {
@Override
- protected double doComputeResult(Iterator<CoRating> coRatings,
- double weightOfItemVectorX, double weightOfItemVectorY,
- int numberOfUsers) {
+ protected double doComputeResult(Iterable<CoRating> coRatings,
+ double weightOfItemVectorX,
+ double weightOfItemVectorY,
+ int numberOfUsers) {
int count = 0;
double sumX = 0.0;
@@ -38,8 +37,7 @@ public class DistributedPearsonCorrelati
double sumX2 = 0.0;
double sumY2 = 0.0;
- while (coRatings.hasNext()) {
- CoRating coRating = coRatings.next();
+ for (CoRating coRating : coRatings) {
double x = coRating.getPrefValueX();
double y = coRating.getPrefValueY();
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedTanimotoCoefficientSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedTanimotoCoefficientSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedTanimotoCoefficientSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedTanimotoCoefficientSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
package org.apache.mahout.cf.taste.hadoop.similarity;
-import java.util.Iterator;
-
import org.apache.mahout.cf.taste.impl.similarity.TanimotoCoefficientSimilarity;
/**
@@ -27,13 +25,13 @@ import org.apache.mahout.cf.taste.impl.s
public class DistributedTanimotoCoefficientSimilarity extends AbstractDistributedItemSimilarity {
@Override
- protected double doComputeResult(Iterator<CoRating> coratings,
- double weightOfItemVectorX, double weightOfItemVectorY,
- int numberOfUsers) {
-
- double preferringXAndY = 0;
- while (coratings.hasNext()) {
- coratings.next();
+ protected double doComputeResult(Iterable<CoRating> coratings,
+ double weightOfItemVectorX,
+ double weightOfItemVectorY,
+ int numberOfUsers) {
+
+ int preferringXAndY = 0;
+ for (CoRating coRating : coratings) {
preferringXAndY++;
}
@@ -45,10 +43,9 @@ public class DistributedTanimotoCoeffici
}
@Override
- public double weightOfItemVector(Iterator<Float> prefValues) {
- double nonZeroEntries = 0;
- while (prefValues.hasNext()) {
- prefValues.next();
+ public double weightOfItemVector(Iterable<Float> prefValues) {
+ int nonZeroEntries = 0;
+ for (Float prefValue : prefValues) {
nonZeroEntries++;
}
return nonZeroEntries;
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredCosineSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredCosineSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredCosineSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredCosineSimilarity.java Wed May 26 18:59:02 2010
@@ -17,8 +17,6 @@
package org.apache.mahout.cf.taste.hadoop.similarity;
-import java.util.Iterator;
-
import org.apache.mahout.cf.taste.impl.similarity.UncenteredCosineSimilarity;
/**
@@ -27,17 +25,17 @@ import org.apache.mahout.cf.taste.impl.s
public class DistributedUncenteredCosineSimilarity extends AbstractDistributedItemSimilarity {
@Override
- protected double doComputeResult(Iterator<CoRating> coratings,
- double weightOfItemVectorX, double weightOfItemVectorY,
- int numberOfUsers) {
+ protected double doComputeResult(Iterable<CoRating> coratings,
+ double weightOfItemVectorX,
+ double weightOfItemVectorY,
+ int numberOfUsers) {
int n = 0;
- double sumXY = 0;
- double sumX2 = 0;
- double sumY2 = 0;
+ double sumXY = 0.0;
+ double sumX2 = 0.0;
+ double sumY2 = 0.0;
- while (coratings.hasNext()) {
- CoRating coRating = coratings.next();
+ for (CoRating coRating : coratings) {
double x = coRating.getPrefValueX();
double y = coRating.getPrefValueY();
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedUncenteredZeroAssumingCosineSimilarity.java Wed May 26 18:59:02 2010
@@ -27,27 +27,26 @@ public final class DistributedUncentered
extends AbstractDistributedItemSimilarity {
@Override
- protected double doComputeResult(Iterator<CoRating> coRatings,
- double weightOfItemVectorX, double weightOfItemVectorY,
- int numberOfUsers) {
-
- double sumXY = 0;
- while (coRatings.hasNext()) {
- CoRating coRating = coRatings.next();
+ protected double doComputeResult(Iterable<CoRating> coRatings,
+ double weightOfItemVectorX,
+ double weightOfItemVectorY,
+ int numberOfUsers) {
+
+ double sumXY = 0.0;
+ for (CoRating coRating : coRatings) {
sumXY += coRating.getPrefValueX() * coRating.getPrefValueY();
}
- if (sumXY == 0) {
+ if (sumXY == 0.0) {
return Double.NaN;
}
return sumXY / (weightOfItemVectorX * weightOfItemVectorY);
}
@Override
- public double weightOfItemVector(Iterator<Float> prefValues) {
+ public double weightOfItemVector(Iterable<Float> prefValues) {
double length = 0.0;
- while (prefValues.hasNext()) {
- float prefValue = prefValues.next();
+ for (float prefValue : prefValues) {
if (!Float.isNaN(prefValue)) {
length += prefValue * prefValue;
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CopreferredItemsMapper.java Wed May 26 18:59:02 2010
@@ -19,10 +19,7 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.similarity.CoRating;
import org.apache.mahout.math.VarLongWritable;
@@ -30,15 +27,14 @@ import org.apache.mahout.math.VarLongWri
* map out each pair of items that appears in the same user-vector together with the multiplied vector lengths
* of the associated item vectors
*/
-public final class CopreferredItemsMapper extends MapReduceBase
- implements Mapper<VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable,ItemPairWritable,CoRating> {
+public final class CopreferredItemsMapper extends
+ Mapper<VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable,ItemPairWritable,CoRating> {
@Override
public void map(VarLongWritable user,
ItemPrefWithItemVectorWeightArrayWritable itemPrefsArray,
- OutputCollector<ItemPairWritable, CoRating> output,
- Reporter reporter)
- throws IOException {
+ Context context)
+ throws IOException, InterruptedException {
ItemPrefWithItemVectorWeightWritable[] itemPrefs = itemPrefsArray.getItemPrefs();
@@ -52,7 +48,7 @@ public final class CopreferredItemsMappe
long itemAID = Math.min(itemNID, itemM.getItemID());
long itemBID = Math.max(itemNID, itemM.getItemID());
ItemPairWritable pair = new ItemPairWritable(itemAID, itemBID, itemNWeight, itemM.getWeight());
- output.collect(pair, new CoRating(itemNValue, itemM.getPrefValue()));
+ context.write(pair, new CoRating(itemNValue, itemM.getPrefValue()));
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersKeyWritable.java Wed May 26 18:59:02 2010
@@ -22,10 +22,10 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.mahout.common.RandomUtils;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Varint;
@@ -81,15 +81,13 @@ public class CountUsersKeyWritable imple
/**
* all userIDs go to the same partition
*/
- public static class CountUsersPartitioner implements Partitioner<CountUsersKeyWritable,VarLongWritable> {
+ public static class CountUsersPartitioner extends Partitioner<CountUsersKeyWritable,VarLongWritable> {
@Override
public int getPartition(CountUsersKeyWritable key, VarLongWritable value, int numPartitions) {
return 0;
}
- @Override
- public void configure(JobConf conf) {}
}
/**
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersMapper.java Wed May 26 18:59:02 2010
@@ -22,29 +22,25 @@ import java.util.regex.Pattern;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarLongWritable;
/**
- * maps out the userIDs in a way that we can use a secondary sort on them
+ * Maps out the userIDs in a way that we can use a secondary sort on them
*/
-public class CountUsersMapper extends MapReduceBase
- implements Mapper<LongWritable,Text,CountUsersKeyWritable, VarLongWritable> {
+public class CountUsersMapper extends
+ Mapper<LongWritable,Text,CountUsersKeyWritable, VarLongWritable> {
private static final Pattern DELIMITER = Pattern.compile("[\t,]");
@Override
- public void map(LongWritable arg0, Text value,
- OutputCollector<CountUsersKeyWritable,VarLongWritable> out, Reporter reporter)
- throws IOException {
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
String[] tokens = DELIMITER.split(value.toString());
long userID = Long.parseLong(tokens[0]);
- out.collect(new CountUsersKeyWritable(userID), new VarLongWritable(userID));
+ context.write(new CountUsersKeyWritable(userID), new VarLongWritable(userID));
}
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersReducer.java?rev=948538&r1=948537&r2=948538&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/CountUsersReducer.java Wed May 26 18:59:02 2010
@@ -18,13 +18,9 @@
package org.apache.mahout.cf.taste.hadoop.similarity.item;
import java.io.IOException;
-import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
@@ -32,25 +28,26 @@ import org.apache.mahout.math.VarLongWri
* counts all unique users, we ensure that we see userIDs sorted in ascending order via
* secondary sort, so we don't have to buffer all of them
*/
-public class CountUsersReducer extends MapReduceBase
- implements Reducer<CountUsersKeyWritable,VarLongWritable, VarIntWritable,NullWritable> {
+public class CountUsersReducer extends
+ Reducer<CountUsersKeyWritable,VarLongWritable, VarIntWritable,NullWritable> {
@Override
- public void reduce(CountUsersKeyWritable key, Iterator<VarLongWritable> userIDs,
- OutputCollector<VarIntWritable,NullWritable> out, Reporter reporter)
- throws IOException {
+ public void reduce(CountUsersKeyWritable key,
+ Iterable<VarLongWritable> userIDs,
+ Context context)
+ throws IOException, InterruptedException {
long lastSeenUserID = Long.MIN_VALUE;
int numberOfUsers = 0;
- while (userIDs.hasNext()) {
- long currentUserID = userIDs.next().get();
+ for (VarLongWritable writable : userIDs) {
+ long currentUserID = writable.get();
if (currentUserID > lastSeenUserID) {
lastSeenUserID = currentUserID;
numberOfUsers++;
}
}
- out.collect(new VarIntWritable(numberOfUsers), NullWritable.get());
+ context.write(new VarIntWritable(numberOfUsers), NullWritable.get());
}
}