You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2009/07/15 21:53:39 UTC

svn commit: r794382 - /lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/

Author: srowen
Date: Wed Jul 15 19:53:38 2009
New Revision: 794382

URL: http://svn.apache.org/viewvc?rev=794382&view=rev
Log:
Might still have some 0.2.0 issues but comitting my migration

Added:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/IdentityReducer.java
Modified:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsReducer.java

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/IdentityReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/IdentityReducer.java?rev=794382&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/IdentityReducer.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/IdentityReducer.java Wed Jul 15 19:53:38 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop;
+
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+/** Copied from Hadoop 0.19. Replace when Hadoop 0.20+ makes Reducer non-abstract. */
+public class IdentityReducer<K, V> extends Reducer<K, V, K, V> {
+
+  @Override
+  protected void reduce(K key, Iterable<V> values, Context context
+  ) throws IOException, InterruptedException {
+    for (V value : values) {
+      context.write(key, value);
+    }
+  }
+
+}

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderJob.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderJob.java Wed Jul 15 19:53:38 2009
@@ -17,16 +17,18 @@
 
 package org.apache.mahout.cf.taste.hadoop;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.mahout.cf.taste.recommender.Recommender;
 
 import java.io.IOException;
@@ -34,53 +36,48 @@
 /**
  * <p>This class configures and runs a {@link RecommenderMapper} using Hadoop.</p>
  *
- * <p>Command line arguments are:</p>
- * <ol>
- *  <li>Fully-qualified class name of {@link Recommender} to use to make recommendations.
- *   Note that it must have a constructor which takes a {@link org.apache.mahout.cf.taste.model.DataModel}
- *   argument.</li>
- *  <li>Number of recommendations to compute per user</li>
- *  <li>Location of a text file containing user IDs for which recommendations should be computed,
- *   one per line</li>
- *  <li>Location of a data model file containing preference data, suitable for use with
- *   {@link org.apache.mahout.cf.taste.impl.model.file.FileDataModel}</li>
- *  <li>Output path where reducer output should go</li>
- * </ol>
+ * <p>Command line arguments are:</p> <ol> <li>Fully-qualified class name of {@link Recommender} to use to make
+ * recommendations. Note that it must have a constructor which takes a {@link org.apache.mahout.cf.taste.model.DataModel}
+ * argument.</li> <li>Number of recommendations to compute per user</li> <li>Location of a text file containing user IDs
+ * for which recommendations should be computed, one per line</li> <li>Location of a data model file containing
+ * preference data, suitable for use with {@link org.apache.mahout.cf.taste.impl.model.file.FileDataModel}</li>
+ * <li>Output path where reducer output should go</li> </ol>
  *
  * <p>Example:</p>
  *
  * <p><code>org.apache.mahout.cf.taste.impl.recommender.slopeone.SlopeOneRecommender 10 path/to/users.txt
- *  path/to/data.csv path/to/reducerOutputDir 5</code></p>
- *
- * <p>TODO I am not a bit sure this works yet in a real distributed environment.</p>
+ * path/to/data.csv path/to/reducerOutputDir 5</code></p>
  */
-public final class RecommenderJob {
-  private RecommenderJob() {
+public final class RecommenderJob extends Job {
+
+  public RecommenderJob(Configuration jobConf) throws IOException {
+    super(jobConf);
   }
 
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) throws Exception {
     String recommendClassName = args[0];
     int recommendationsPerUser = Integer.parseInt(args[1]);
     String userIDFile = args[2];
     String dataModelFile = args[3];
     String outputPath = args[4];
-    JobConf jobConf =
+    Configuration jobConf =
         buildJobConf(recommendClassName, recommendationsPerUser, userIDFile, dataModelFile, outputPath);
-    JobClient.runJob(jobConf);
+    Job job = new RecommenderJob(jobConf);
+    job.waitForCompletion(true);
   }
 
-  public static JobConf buildJobConf(String recommendClassName,
-                                     int recommendationsPerUser,
-                                     String userIDFile,
-                                     String dataModelFile,
-                                     String outputPath) throws IOException {
+  public static Configuration buildJobConf(String recommendClassName,
+                                           int recommendationsPerUser,
+                                           String userIDFile,
+                                           String dataModelFile,
+                                           String outputPath) throws IOException {
 
-    Path userIDFilePath = new Path(userIDFile);
-    Path outputPathPath = new Path(outputPath);
+    Configuration jobConf = new Configuration();
+    FileSystem fs = FileSystem.get(jobConf);
 
-    JobConf jobConf = new JobConf(RecommenderJob.class);
+    Path userIDFilePath = new Path(userIDFile).makeQualified(fs);
+    Path outputPathPath = new Path(outputPath).makeQualified(fs);
 
-    FileSystem fs = FileSystem.get(outputPathPath.toUri(), jobConf);
     if (fs.exists(outputPathPath)) {
       fs.delete(outputPathPath, true);
     }
@@ -89,19 +86,19 @@
     jobConf.set(RecommenderMapper.RECOMMENDATIONS_PER_USER, String.valueOf(recommendationsPerUser));
     jobConf.set(RecommenderMapper.DATA_MODEL_FILE, dataModelFile);
 
-    jobConf.setInputFormat(TextInputFormat.class);
-    FileInputFormat.setInputPaths(jobConf, userIDFilePath);
+    jobConf.setClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class);
+    jobConf.set("mapred.input.dir", StringUtils.escapeString(userIDFilePath.toString()));
 
-    jobConf.setMapperClass(RecommenderMapper.class);
-    jobConf.setMapOutputKeyClass(Text.class);
-    jobConf.setMapOutputValueClass(RecommendedItemsWritable.class);
-
-    jobConf.setReducerClass(IdentityReducer.class);
-    jobConf.setOutputKeyClass(Text.class);
-    jobConf.setOutputValueClass(RecommendedItemsWritable.class);
+    jobConf.setClass("mapred.mapper.class", RecommenderMapper.class, Mapper.class);
+    jobConf.setClass("mapred.mapoutput.key.class", Text.class, Object.class);
+    jobConf.setClass("mapred.mapoutput.value.class", RecommendedItemsWritable.class, Object.class);
+
+    jobConf.setClass("mapred.reducer.class", IdentityReducer.class, Reducer.class);
+    jobConf.setClass("mapred.output.key.class", Text.class, Object.class);
+    jobConf.setClass("mapred.output.value.class", RecommendedItemsWritable.class, Object.class);
 
-    jobConf.setOutputFormat(TextOutputFormat.class);
-    FileOutputFormat.setOutputPath(jobConf, outputPathPath);
+    jobConf.setClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class);
+    jobConf.set("mapred.output.dir", StringUtils.escapeString(outputPathPath.toString()));
 
     return jobConf;
   }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderMapper.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/RecommenderMapper.java Wed Jul 15 19:53:38 2009
@@ -17,15 +17,12 @@
 
 package org.apache.mahout.cf.taste.hadoop;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.cf.taste.common.TasteException;
 import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;
 import org.apache.mahout.cf.taste.model.DataModel;
@@ -39,20 +36,18 @@
 import java.util.List;
 
 /**
- * <p>The {@link Mapper} which takes as input a file of user IDs (treated as Strings, note), and
- * for each unique user ID, computes recommendations with the configured {@link Recommender}.
- * The results are output as {@link RecommendedItemsWritable}.</p>
+ * <p>The {@link Mapper} which takes as input a file of user IDs (treated as Strings, note), and for each unique user
+ * ID, computes recommendations with the configured {@link Recommender}. The results are output as {@link
+ * RecommendedItemsWritable}.</p>
  *
- * <p>Note that there is no corresponding {@link org.apache.hadoop.mapred.Reducer}; this
- * implementation can only partially take advantage of the mapreduce paradigm and only
- * really leverages it for easy parallelization. Therefore, use the
- * {@link org.apache.hadoop.mapred.lib.IdentityReducer} when running this on Hadoop.</p>
+ * <p>Note that there is no corresponding {@link org.apache.hadoop.mapred.Reducer}; this implementation can only
+ * partially take advantage of the mapreduce paradigm and only really leverages it for easy parallelization. Therefore,
+ * use the {@link org.apache.hadoop.mapred.lib.IdentityReducer} when running this on Hadoop.</p>
  *
  * @see RecommenderJob
  */
 public final class RecommenderMapper
-    extends MapReduceBase
-    implements Mapper<LongWritable, Text, Text, RecommendedItemsWritable> {
+    extends Mapper<LongWritable, Text, Text, RecommendedItemsWritable> {
 
   static final String RECOMMENDER_CLASS_NAME = "recommenderClassName";
   static final String RECOMMENDATIONS_PER_USER = "recommendationsPerUser";
@@ -62,10 +57,8 @@
   private int recommendationsPerUser;
 
   @Override
-  public void map(LongWritable key,
-                  Text value,
-                  OutputCollector<Text, RecommendedItemsWritable> output,
-                  Reporter reporter) throws IOException {
+  protected void map(LongWritable key, Text value,
+                     Context context) throws IOException, InterruptedException {
     String userID = value.toString();
     List<RecommendedItem> recommendedItems;
     try {
@@ -74,13 +67,14 @@
       throw new RuntimeException(te);
     }
     RecommendedItemsWritable writable = new RecommendedItemsWritable(recommendedItems);
-    output.collect(new Text(userID), writable);
-    reporter.incrCounter(ReducerMetrics.USERS_PROCESSED, 1L);
-    reporter.incrCounter(ReducerMetrics.RECOMMENDATIONS_MADE, recommendedItems.size());
+    context.write(new Text(userID), writable);
+    context.getCounter(ReducerMetrics.USERS_PROCESSED).increment(1L);
+    context.getCounter(ReducerMetrics.RECOMMENDATIONS_MADE).increment(recommendedItems.size());
   }
 
   @Override
-  public void configure(JobConf jobConf) {
+  protected void setup(Context context) {
+    Configuration jobConf = context.getConfiguration();
     String dataModelFile = jobConf.get(DATA_MODEL_FILE);
     String recommenderClassName = jobConf.get(RECOMMENDER_CLASS_NAME);
     FileDataModel fileDataModel;

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesJob.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesJob.java Wed Jul 15 19:53:38 2009
@@ -17,58 +17,61 @@
 
 package org.apache.mahout.cf.taste.hadoop;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
 
-/**
- */
-public final class SlopeOneDiffsToAveragesJob {
-  private SlopeOneDiffsToAveragesJob() {
+public final class SlopeOneDiffsToAveragesJob extends Job {
+
+  private SlopeOneDiffsToAveragesJob(Configuration jobConf) throws IOException {
+    super(jobConf);
   }
 
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) throws Exception {
     String prefsFile = args[0];
     String outputPath = args[1];
-    JobConf jobConf = buildJobConf(prefsFile, outputPath);
-    JobClient.runJob(jobConf);
+    Configuration jobConf = buildJobConf(prefsFile, outputPath);
+    Job job = new SlopeOneDiffsToAveragesJob(jobConf);
+    job.waitForCompletion(true);
   }
 
-  public static JobConf buildJobConf(String prefsFile,
-                                     String outputPath) throws IOException {
+  public static Configuration buildJobConf(String prefsFile,
+                                           String outputPath) throws IOException {
 
-    Path prefsFilePath = new Path(prefsFile);
-    Path outputPathPath = new Path(outputPath);
+    Configuration jobConf = new Configuration();
+    FileSystem fs = FileSystem.get(jobConf);
 
-    JobConf jobConf = new JobConf(SlopeOnePrefsToDiffsJob.class);
+    Path prefsFilePath = new Path(prefsFile).makeQualified(fs);
+    Path outputPathPath = new Path(outputPath).makeQualified(fs);
 
-    FileSystem fs = FileSystem.get(outputPathPath.toUri(), jobConf);
     if (fs.exists(outputPathPath)) {
       fs.delete(outputPathPath, true);
     }
 
-    jobConf.setInputFormat(SequenceFileInputFormat.class);
-    FileInputFormat.setInputPaths(jobConf, prefsFilePath);
+    jobConf.setClass("mapred.input.format.class", SequenceFileInputFormat.class, InputFormat.class);
+    jobConf.set("mapred.input.dir", StringUtils.escapeString(prefsFilePath.toString()));
 
-    jobConf.setMapperClass(IdentityMapper.class);
-    jobConf.setMapOutputKeyClass(ItemItemWritable.class);
-    jobConf.setMapOutputValueClass(DoubleWritable.class);
-
-    jobConf.setReducerClass(SlopeOneDiffsToAveragesReducer.class);
-    jobConf.setOutputKeyClass(ItemItemWritable.class);
-    jobConf.setOutputValueClass(DoubleWritable.class);
+    jobConf.setClass("mapred.mapper.class", Mapper.class, Mapper.class);
+    jobConf.setClass("mapred.mapoutput.key.class", ItemItemWritable.class, Object.class);
+    jobConf.setClass("mapred.mapoutput.value.class", DoubleWritable.class, Object.class);
+
+    jobConf.setClass("mapred.reducer.class", SlopeOneDiffsToAveragesReducer.class, Reducer.class);
+    jobConf.setClass("mapred.output.key.class", ItemItemWritable.class, Object.class);
+    jobConf.setClass("mapred.output.value.class", DoubleWritable.class, Object.class);
 
-    jobConf.setOutputFormat(TextOutputFormat.class);
-    FileOutputFormat.setOutputPath(jobConf, outputPathPath);
+    jobConf.setClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class);
+    jobConf.set("mapred.output.dir", StringUtils.escapeString(outputPathPath.toString()));
 
     return jobConf;
   }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesReducer.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOneDiffsToAveragesReducer.java Wed Jul 15 19:53:38 2009
@@ -18,30 +18,23 @@
 package org.apache.mahout.cf.taste.hadoop;
 
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 public final class SlopeOneDiffsToAveragesReducer
-    extends MapReduceBase
-    implements Reducer<ItemItemWritable, DoubleWritable, ItemItemWritable, DoubleWritable> {
+    extends Reducer<ItemItemWritable, DoubleWritable, ItemItemWritable, DoubleWritable> {
 
   @Override
-  public void reduce(ItemItemWritable key,
-                     Iterator<DoubleWritable> values,
-                     OutputCollector<ItemItemWritable, DoubleWritable> output,
-                     Reporter reporter) throws IOException {
+  protected void reduce(ItemItemWritable key, Iterable<DoubleWritable> values, Context context)
+      throws IOException, InterruptedException {
     int count = 0;
     double total = 0.0;
-    while (values.hasNext()) {
-      total += values.next().get();
+    for (DoubleWritable value : values) {
+      total += value.get();
       count++;
     }
-    output.collect(key, new DoubleWritable((total / count)));
+    context.write(key, new DoubleWritable((total / count)));
   }
 
 }
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsJob.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsJob.java Wed Jul 15 19:53:38 2009
@@ -17,60 +17,62 @@
 
 package org.apache.mahout.cf.taste.hadoop;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
 
-/**
- */
-public final class SlopeOnePrefsToDiffsJob {
-  private SlopeOnePrefsToDiffsJob() {
+public final class SlopeOnePrefsToDiffsJob extends Job {
+
+  private SlopeOnePrefsToDiffsJob(Configuration jobConf) throws IOException {
+    super(jobConf);
   }
 
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) throws Exception {
     String prefsFile = args[0];
     String outputPath = args[1];
-    JobConf jobConf = buildJobConf(prefsFile, outputPath);
-    JobClient.runJob(jobConf);
+    Configuration jobConf = buildJobConf(prefsFile, outputPath);
+    Job job = new SlopeOnePrefsToDiffsJob(jobConf);
+    job.waitForCompletion(true);
   }
 
-  public static JobConf buildJobConf(String prefsFile,
-                                     String outputPath) throws IOException {
+  public static Configuration buildJobConf(String prefsFile,
+                                           String outputPath) throws IOException {
 
-    Path prefsFilePath = new Path(prefsFile);
-    Path outputPathPath = new Path(outputPath);
+    Configuration jobConf = new Configuration();
+    FileSystem fs = FileSystem.get(jobConf);
 
-    JobConf jobConf = new JobConf(SlopeOnePrefsToDiffsJob.class);
+    Path prefsFilePath = new Path(prefsFile).makeQualified(fs);
+    Path outputPathPath = new Path(outputPath).makeQualified(fs);
 
-    FileSystem fs = FileSystem.get(outputPathPath.toUri(), jobConf);
     if (fs.exists(outputPathPath)) {
       fs.delete(outputPathPath, true);
     }
 
-    jobConf.setInputFormat(TextInputFormat.class);
-    FileInputFormat.setInputPaths(jobConf, prefsFilePath);
+    jobConf.setClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class);
+    jobConf.set("mapred.input.dir", StringUtils.escapeString(prefsFilePath.toString()));
+
+    jobConf.setClass("mapred.mapper.class", SlopeOnePrefsToDiffsMapper.class, Mapper.class);
+    jobConf.setClass("mapred.mapoutput.key.class", Text.class, Object.class);
+    jobConf.setClass("mapred.mapoutput.value.class", ItemPrefWritable.class, Object.class);
+
+    jobConf.setClass("mapred.reducer.class", SlopeOnePrefsToDiffsReducer.class, Reducer.class);
+    jobConf.setClass("mapred.output.key.class", ItemItemWritable.class, Object.class);
+    jobConf.setClass("mapred.output.value.class", DoubleWritable.class, Object.class);
 
-    jobConf.setMapperClass(SlopeOnePrefsToDiffsMapper.class);
-    jobConf.setMapOutputKeyClass(Text.class);
-    jobConf.setMapOutputValueClass(ItemPrefWritable.class);
-
-    jobConf.setReducerClass(SlopeOnePrefsToDiffsReducer.class);
-    jobConf.setOutputKeyClass(ItemItemWritable.class);
-    jobConf.setOutputValueClass(DoubleWritable.class);
-
-    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
-    SequenceFileOutputFormat.setOutputCompressionType(jobConf, SequenceFile.CompressionType.RECORD);
-    FileOutputFormat.setOutputPath(jobConf, outputPathPath);
+    jobConf.setClass("mapred.output.format.class", SequenceFileOutputFormat.class, OutputFormat.class);
+    jobConf.set("mapred.output.dir", StringUtils.escapeString(outputPathPath.toString()));
 
     return jobConf;
   }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsMapper.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsMapper.java Wed Jul 15 19:53:38 2009
@@ -19,30 +19,22 @@
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
 
 import java.io.IOException;
 
-/**
- */
 public final class SlopeOnePrefsToDiffsMapper
-    extends MapReduceBase
-    implements Mapper<LongWritable, Text, Text, ItemPrefWritable> {
+    extends Mapper<LongWritable, Text, Text, ItemPrefWritable> {
 
   @Override
-  public void map(LongWritable key,
-                  Text value,
-                  OutputCollector<Text, ItemPrefWritable> output,
-                  Reporter reporter) throws IOException {
+  protected void map(LongWritable key, Text value,
+                     Context context) throws IOException, InterruptedException {
     String line = value.toString();
     String[] tokens = line.split(",");
     String userID = tokens[0];
     String itemID = tokens[1];
     double prefValue = Double.parseDouble(tokens[2]);
-    output.collect(new Text(userID), new ItemPrefWritable(itemID, prefValue));
+    context.write(new Text(userID), new ItemPrefWritable(itemID, prefValue));
   }
 
 }
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsReducer.java?rev=794382&r1=794381&r2=794382&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/SlopeOnePrefsToDiffsReducer.java Wed Jul 15 19:53:38 2009
@@ -19,29 +19,22 @@
 
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 
 public final class SlopeOnePrefsToDiffsReducer
-    extends MapReduceBase
-    implements Reducer<Text, ItemPrefWritable, ItemItemWritable, DoubleWritable> {
+    extends Reducer<Text, ItemPrefWritable, ItemItemWritable, DoubleWritable> {
 
   @Override
-  public void reduce(Text key,
-                     Iterator<ItemPrefWritable> values,
-                     OutputCollector<ItemItemWritable, DoubleWritable> output,
-                     Reporter reporter) throws IOException {
+  protected void reduce(Text key, Iterable<ItemPrefWritable> values, Context context)
+      throws IOException, InterruptedException {
     List<ItemPrefWritable> prefs = new ArrayList<ItemPrefWritable>();
-    while (values.hasNext()) {
-      prefs.add(new ItemPrefWritable(values.next()));
+    for (ItemPrefWritable value : values) {
+      prefs.add(new ItemPrefWritable(value));
     }
     Collections.sort(prefs, ByItemIDComparator.getInstance());
     int size = prefs.size();
@@ -53,7 +46,7 @@
         ItemPrefWritable second = prefs.get(j);
         String itemBID = second.getItemID();
         double itemBValue = second.getPrefValue();
-        output.collect(new ItemItemWritable(itemAID, itemBID), new DoubleWritable(itemBValue - itemAValue));
+        context.write(new ItemItemWritable(itemAID, itemBID), new DoubleWritable(itemBValue - itemAValue));
       }
     }
   }