You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2009/07/15 21:53:39 UTC
svn commit: r794382 -
/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/
Author: srowen
Date: Wed Jul 15 19:53:38 2009
New Revision: 794382
URL: http://svn.apache.org/viewvc?rev=794382&view=rev
Log:
Might still have some 0.2.0 issues but comitting my migration
Added:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/IdentityReducer.java
Modified:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsReducer.java
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/IdentityReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/IdentityReducer.java?rev=794382&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/IdentityReducer.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/IdentityReducer.java Wed Jul 15 19:53:38 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop;
+
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+/** Copied from Hadoop 0.19. Replace when Hadoop 0.20+ makes Reducer non-abstract. */
+public class IdentityReducer<K, V> extends Reducer<K, V, K, V> {
+
+ @Override
+ protected void reduce(K key, Iterable<V> values, Context context
+ ) throws IOException, InterruptedException {
+ for (V value : values) {
+ context.write(key, value);
+ }
+ }
+
+}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderJob.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderJob.java Wed Jul 15 19:53:38 2009
@@ -17,16 +17,18 @@
package org.apache.mahout.cf.taste.hadoop;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
import org.apache.mahout.cf.taste.recommender.Recommender;
import java.io.IOException;
@@ -34,53 +36,48 @@
/**
* <p>This class configures and runs a {@link RecommenderMapper} using Hadoop.</p>
*
- * <p>Command line arguments are:</p>
- * <ol>
- * <li>Fully-qualified class name of {@link Recommender} to use to make recommendations.
- * Note that it must have a constructor which takes a {@link org.apache.mahout.cf.taste.model.DataModel}
- * argument.</li>
- * <li>Number of recommendations to compute per user</li>
- * <li>Location of a text file containing user IDs for which recommendations should be computed,
- * one per line</li>
- * <li>Location of a data model file containing preference data, suitable for use with
- * {@link org.apache.mahout.cf.taste.impl.model.file.FileDataModel}</li>
- * <li>Output path where reducer output should go</li>
- * </ol>
+ * <p>Command line arguments are:</p> <ol> <li>Fully-qualified class name of {@link Recommender} to use to make
+ * recommendations. Note that it must have a constructor which takes a {@link org.apache.mahout.cf.taste.model.DataModel}
+ * argument.</li> <li>Number of recommendations to compute per user</li> <li>Location of a text file containing user IDs
+ * for which recommendations should be computed, one per line</li> <li>Location of a data model file containing
+ * preference data, suitable for use with {@link org.apache.mahout.cf.taste.impl.model.file.FileDataModel}</li>
+ * <li>Output path where reducer output should go</li> </ol>
*
* <p>Example:</p>
*
* <p><code>org.apache.mahout.cf.taste.impl.recommender.slopeone.SlopeOneRecommender 10 path/to/users.txt
- * path/to/data.csv path/to/reducerOutputDir 5</code></p>
- *
- * <p>TODO I am not a bit sure this works yet in a real distributed environment.</p>
+ * path/to/data.csv path/to/reducerOutputDir 5</code></p>
*/
-public final class RecommenderJob {
- private RecommenderJob() {
+public final class RecommenderJob extends Job {
+
+ public RecommenderJob(Configuration jobConf) throws IOException {
+ super(jobConf);
}
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws Exception {
String recommendClassName = args[0];
int recommendationsPerUser = Integer.parseInt(args[1]);
String userIDFile = args[2];
String dataModelFile = args[3];
String outputPath = args[4];
- JobConf jobConf =
+ Configuration jobConf =
buildJobConf(recommendClassName, recommendationsPerUser, userIDFile, dataModelFile, outputPath);
- JobClient.runJob(jobConf);
+ Job job = new RecommenderJob(jobConf);
+ job.waitForCompletion(true);
}
- public static JobConf buildJobConf(String recommendClassName,
- int recommendationsPerUser,
- String userIDFile,
- String dataModelFile,
- String outputPath) throws IOException {
+ public static Configuration buildJobConf(String recommendClassName,
+ int recommendationsPerUser,
+ String userIDFile,
+ String dataModelFile,
+ String outputPath) throws IOException {
- Path userIDFilePath = new Path(userIDFile);
- Path outputPathPath = new Path(outputPath);
+ Configuration jobConf = new Configuration();
+ FileSystem fs = FileSystem.get(jobConf);
- JobConf jobConf = new JobConf(RecommenderJob.class);
+ Path userIDFilePath = new Path(userIDFile).makeQualified(fs);
+ Path outputPathPath = new Path(outputPath).makeQualified(fs);
- FileSystem fs = FileSystem.get(outputPathPath.toUri(), jobConf);
if (fs.exists(outputPathPath)) {
fs.delete(outputPathPath, true);
}
@@ -89,19 +86,19 @@
jobConf.set(RecommenderMapper.RECOMMENDATIONS_PER_USER, String.valueOf(recommendationsPerUser));
jobConf.set(RecommenderMapper.DATA_MODEL_FILE, dataModelFile);
- jobConf.setInputFormat(TextInputFormat.class);
- FileInputFormat.setInputPaths(jobConf, userIDFilePath);
+ jobConf.setClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class);
+ jobConf.set("mapred.input.dir", StringUtils.escapeString(userIDFilePath.toString()));
- jobConf.setMapperClass(RecommenderMapper.class);
- jobConf.setMapOutputKeyClass(Text.class);
- jobConf.setMapOutputValueClass(RecommendedItemsWritable.class);
-
- jobConf.setReducerClass(IdentityReducer.class);
- jobConf.setOutputKeyClass(Text.class);
- jobConf.setOutputValueClass(RecommendedItemsWritable.class);
+ jobConf.setClass("mapred.mapper.class", RecommenderMapper.class, Mapper.class);
+ jobConf.setClass("mapred.mapoutput.key.class", Text.class, Object.class);
+ jobConf.setClass("mapred.mapoutput.value.class", RecommendedItemsWritable.class, Object.class);
+
+ jobConf.setClass("mapred.reducer.class", IdentityReducer.class, Reducer.class);
+ jobConf.setClass("mapred.output.key.class", Text.class, Object.class);
+ jobConf.setClass("mapred.output.value.class", RecommendedItemsWritable.class, Object.class);
- jobConf.setOutputFormat(TextOutputFormat.class);
- FileOutputFormat.setOutputPath(jobConf, outputPathPath);
+ jobConf.setClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class);
+ jobConf.set("mapred.output.dir", StringUtils.escapeString(outputPathPath.toString()));
return jobConf;
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderMapper.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderMapper.java Wed Jul 15 19:53:38 2009
@@ -17,15 +17,12 @@
package org.apache.mahout.cf.taste.hadoop;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;
import org.apache.mahout.cf.taste.model.DataModel;
@@ -39,20 +36,18 @@
import java.util.List;
/**
- * <p>The {@link Mapper} which takes as input a file of user IDs (treated as Strings, note), and
- * for each unique user ID, computes recommendations with the configured {@link Recommender}.
- * The results are output as {@link RecommendedItemsWritable}.</p>
+ * <p>The {@link Mapper} which takes as input a file of user IDs (treated as Strings, note), and for each unique user
+ * ID, computes recommendations with the configured {@link Recommender}. The results are output as {@link
+ * RecommendedItemsWritable}.</p>
*
- * <p>Note that there is no corresponding {@link org.apache.hadoop.mapred.Reducer}; this
- * implementation can only partially take advantage of the mapreduce paradigm and only
- * really leverages it for easy parallelization. Therefore, use the
- * {@link org.apache.hadoop.mapred.lib.IdentityReducer} when running this on Hadoop.</p>
+ * <p>Note that there is no corresponding {@link org.apache.hadoop.mapred.Reducer}; this implementation can only
+ * partially take advantage of the mapreduce paradigm and only really leverages it for easy parallelization. Therefore,
+ * use the {@link org.apache.hadoop.mapred.lib.IdentityReducer} when running this on Hadoop.</p>
*
* @see RecommenderJob
*/
public final class RecommenderMapper
- extends MapReduceBase
- implements Mapper<LongWritable, Text, Text, RecommendedItemsWritable> {
+ extends Mapper<LongWritable, Text, Text, RecommendedItemsWritable> {
static final String RECOMMENDER_CLASS_NAME = "recommenderClassName";
static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
@@ -62,10 +57,8 @@
private int recommendationsPerUser;
@Override
- public void map(LongWritable key,
- Text value,
- OutputCollector<Text, RecommendedItemsWritable> output,
- Reporter reporter) throws IOException {
+ protected void map(LongWritable key, Text value,
+ Context context) throws IOException, InterruptedException {
String userID = value.toString();
List<RecommendedItem> recommendedItems;
try {
@@ -74,13 +67,14 @@
throw new RuntimeException(te);
}
RecommendedItemsWritable writable = new RecommendedItemsWritable(recommendedItems);
- output.collect(new Text(userID), writable);
- reporter.incrCounter(ReducerMetrics.USERS_PROCESSED, 1L);
- reporter.incrCounter(ReducerMetrics.RECOMMENDATIONS_MADE, recommendedItems.size());
+ context.write(new Text(userID), writable);
+ context.getCounter(ReducerMetrics.USERS_PROCESSED).increment(1L);
+ context.getCounter(ReducerMetrics.RECOMMENDATIONS_MADE).increment(recommendedItems.size());
}
@Override
- public void configure(JobConf jobConf) {
+ protected void setup(Context context) {
+ Configuration jobConf = context.getConfiguration();
String dataModelFile = jobConf.get(DATA_MODEL_FILE);
String recommenderClassName = jobConf.get(RECOMMENDER_CLASS_NAME);
FileDataModel fileDataModel;
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesJob.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesJob.java Wed Jul 15 19:53:38 2009
@@ -17,58 +17,61 @@
package org.apache.mahout.cf.taste.hadoop;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
-/**
- */
-public final class SlopeOneDiffsToAveragesJob {
- private SlopeOneDiffsToAveragesJob() {
+public final class SlopeOneDiffsToAveragesJob extends Job {
+
+ private SlopeOneDiffsToAveragesJob(Configuration jobConf) throws IOException {
+ super(jobConf);
}
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws Exception {
String prefsFile = args[0];
String outputPath = args[1];
- JobConf jobConf = buildJobConf(prefsFile, outputPath);
- JobClient.runJob(jobConf);
+ Configuration jobConf = buildJobConf(prefsFile, outputPath);
+ Job job = new SlopeOneDiffsToAveragesJob(jobConf);
+ job.waitForCompletion(true);
}
- public static JobConf buildJobConf(String prefsFile,
- String outputPath) throws IOException {
+ public static Configuration buildJobConf(String prefsFile,
+ String outputPath) throws IOException {
- Path prefsFilePath = new Path(prefsFile);
- Path outputPathPath = new Path(outputPath);
+ Configuration jobConf = new Configuration();
+ FileSystem fs = FileSystem.get(jobConf);
- JobConf jobConf = new JobConf(SlopeOnePrefsToDiffsJob.class);
+ Path prefsFilePath = new Path(prefsFile).makeQualified(fs);
+ Path outputPathPath = new Path(outputPath).makeQualified(fs);
- FileSystem fs = FileSystem.get(outputPathPath.toUri(), jobConf);
if (fs.exists(outputPathPath)) {
fs.delete(outputPathPath, true);
}
- jobConf.setInputFormat(SequenceFileInputFormat.class);
- FileInputFormat.setInputPaths(jobConf, prefsFilePath);
+ jobConf.setClass("mapred.input.format.class", SequenceFileInputFormat.class, InputFormat.class);
+ jobConf.set("mapred.input.dir", StringUtils.escapeString(prefsFilePath.toString()));
- jobConf.setMapperClass(IdentityMapper.class);
- jobConf.setMapOutputKeyClass(ItemItemWritable.class);
- jobConf.setMapOutputValueClass(DoubleWritable.class);
-
- jobConf.setReducerClass(SlopeOneDiffsToAveragesReducer.class);
- jobConf.setOutputKeyClass(ItemItemWritable.class);
- jobConf.setOutputValueClass(DoubleWritable.class);
+ jobConf.setClass("mapred.mapper.class", Mapper.class, Mapper.class);
+ jobConf.setClass("mapred.mapoutput.key.class", ItemItemWritable.class, Object.class);
+ jobConf.setClass("mapred.mapoutput.value.class", DoubleWritable.class, Object.class);
+
+ jobConf.setClass("mapred.reducer.class", SlopeOneDiffsToAveragesReducer.class, Reducer.class);
+ jobConf.setClass("mapred.output.key.class", ItemItemWritable.class, Object.class);
+ jobConf.setClass("mapred.output.value.class", DoubleWritable.class, Object.class);
- jobConf.setOutputFormat(TextOutputFormat.class);
- FileOutputFormat.setOutputPath(jobConf, outputPathPath);
+ jobConf.setClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class);
+ jobConf.set("mapred.output.dir", StringUtils.escapeString(outputPathPath.toString()));
return jobConf;
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesReducer.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesReducer.java Wed Jul 15 19:53:38 2009
@@ -18,30 +18,23 @@
package org.apache.mahout.cf.taste.hadoop;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
-import java.util.Iterator;
public final class SlopeOneDiffsToAveragesReducer
- extends MapReduceBase
- implements Reducer<ItemItemWritable, DoubleWritable, ItemItemWritable, DoubleWritable> {
+ extends Reducer<ItemItemWritable, DoubleWritable, ItemItemWritable, DoubleWritable> {
@Override
- public void reduce(ItemItemWritable key,
- Iterator<DoubleWritable> values,
- OutputCollector<ItemItemWritable, DoubleWritable> output,
- Reporter reporter) throws IOException {
+ protected void reduce(ItemItemWritable key, Iterable<DoubleWritable> values, Context context)
+ throws IOException, InterruptedException {
int count = 0;
double total = 0.0;
- while (values.hasNext()) {
- total += values.next().get();
+ for (DoubleWritable value : values) {
+ total += value.get();
count++;
}
- output.collect(key, new DoubleWritable((total / count)));
+ context.write(key, new DoubleWritable((total / count)));
}
}
\ No newline at end of file
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsJob.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsJob.java Wed Jul 15 19:53:38 2009
@@ -17,60 +17,62 @@
package org.apache.mahout.cf.taste.hadoop;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
-/**
- */
-public final class SlopeOnePrefsToDiffsJob {
- private SlopeOnePrefsToDiffsJob() {
+public final class SlopeOnePrefsToDiffsJob extends Job {
+
+ private SlopeOnePrefsToDiffsJob(Configuration jobConf) throws IOException {
+ super(jobConf);
}
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws Exception {
String prefsFile = args[0];
String outputPath = args[1];
- JobConf jobConf = buildJobConf(prefsFile, outputPath);
- JobClient.runJob(jobConf);
+ Configuration jobConf = buildJobConf(prefsFile, outputPath);
+ Job job = new SlopeOnePrefsToDiffsJob(jobConf);
+ job.waitForCompletion(true);
}
- public static JobConf buildJobConf(String prefsFile,
- String outputPath) throws IOException {
+ public static Configuration buildJobConf(String prefsFile,
+ String outputPath) throws IOException {
- Path prefsFilePath = new Path(prefsFile);
- Path outputPathPath = new Path(outputPath);
+ Configuration jobConf = new Configuration();
+ FileSystem fs = FileSystem.get(jobConf);
- JobConf jobConf = new JobConf(SlopeOnePrefsToDiffsJob.class);
+ Path prefsFilePath = new Path(prefsFile).makeQualified(fs);
+ Path outputPathPath = new Path(outputPath).makeQualified(fs);
- FileSystem fs = FileSystem.get(outputPathPath.toUri(), jobConf);
if (fs.exists(outputPathPath)) {
fs.delete(outputPathPath, true);
}
- jobConf.setInputFormat(TextInputFormat.class);
- FileInputFormat.setInputPaths(jobConf, prefsFilePath);
+ jobConf.setClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class);
+ jobConf.set("mapred.input.dir", StringUtils.escapeString(prefsFilePath.toString()));
+
+ jobConf.setClass("mapred.mapper.class", SlopeOnePrefsToDiffsMapper.class, Mapper.class);
+ jobConf.setClass("mapred.mapoutput.key.class", Text.class, Object.class);
+ jobConf.setClass("mapred.mapoutput.value.class", ItemPrefWritable.class, Object.class);
+
+ jobConf.setClass("mapred.reducer.class", SlopeOnePrefsToDiffsReducer.class, Reducer.class);
+ jobConf.setClass("mapred.output.key.class", ItemItemWritable.class, Object.class);
+ jobConf.setClass("mapred.output.value.class", DoubleWritable.class, Object.class);
- jobConf.setMapperClass(SlopeOnePrefsToDiffsMapper.class);
- jobConf.setMapOutputKeyClass(Text.class);
- jobConf.setMapOutputValueClass(ItemPrefWritable.class);
-
- jobConf.setReducerClass(SlopeOnePrefsToDiffsReducer.class);
- jobConf.setOutputKeyClass(ItemItemWritable.class);
- jobConf.setOutputValueClass(DoubleWritable.class);
-
- jobConf.setOutputFormat(SequenceFileOutputFormat.class);
- SequenceFileOutputFormat.setOutputCompressionType(jobConf, SequenceFile.CompressionType.RECORD);
- FileOutputFormat.setOutputPath(jobConf, outputPathPath);
+ jobConf.setClass("mapred.output.format.class", SequenceFileOutputFormat.class, OutputFormat.class);
+ jobConf.set("mapred.output.dir", StringUtils.escapeString(outputPathPath.toString()));
return jobConf;
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsMapper.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsMapper.java Wed Jul 15 19:53:38 2009
@@ -19,30 +19,22 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
-/**
- */
public final class SlopeOnePrefsToDiffsMapper
- extends MapReduceBase
- implements Mapper<LongWritable, Text, Text, ItemPrefWritable> {
+ extends Mapper<LongWritable, Text, Text, ItemPrefWritable> {
@Override
- public void map(LongWritable key,
- Text value,
- OutputCollector<Text, ItemPrefWritable> output,
- Reporter reporter) throws IOException {
+ protected void map(LongWritable key, Text value,
+ Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] tokens = line.split(",");
String userID = tokens[0];
String itemID = tokens[1];
double prefValue = Double.parseDouble(tokens[2]);
- output.collect(new Text(userID), new ItemPrefWritable(itemID, prefValue));
+ context.write(new Text(userID), new ItemPrefWritable(itemID, prefValue));
}
}
\ No newline at end of file
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsReducer.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsReducer.java Wed Jul 15 19:53:38 2009
@@ -19,29 +19,22 @@
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
public final class SlopeOnePrefsToDiffsReducer
- extends MapReduceBase
- implements Reducer<Text, ItemPrefWritable, ItemItemWritable, DoubleWritable> {
+ extends Reducer<Text, ItemPrefWritable, ItemItemWritable, DoubleWritable> {
@Override
- public void reduce(Text key,
- Iterator<ItemPrefWritable> values,
- OutputCollector<ItemItemWritable, DoubleWritable> output,
- Reporter reporter) throws IOException {
+ protected void reduce(Text key, Iterable<ItemPrefWritable> values, Context context)
+ throws IOException, InterruptedException {
List<ItemPrefWritable> prefs = new ArrayList<ItemPrefWritable>();
- while (values.hasNext()) {
- prefs.add(new ItemPrefWritable(values.next()));
+ for (ItemPrefWritable value : values) {
+ prefs.add(new ItemPrefWritable(value));
}
Collections.sort(prefs, ByItemIDComparator.getInstance());
int size = prefs.size();
@@ -53,7 +46,7 @@
ItemPrefWritable second = prefs.get(j);
String itemBID = second.getItemID();
double itemBValue = second.getPrefValue();
- output.collect(new ItemItemWritable(itemAID, itemBID), new DoubleWritable(itemBValue - itemAValue));
+ context.write(new ItemItemWritable(itemAID, itemBID), new DoubleWritable(itemBValue - itemAValue));
}
}
}