You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@mahout.apache.org by Laszlo Dosa <la...@fredhopper.com> on 2010/07/05 18:30:10 UTC

RE: DistributedLanczosSolver input

Cool! Thanks!

-----Original Message-----
From: Jake Mannix [mailto:jake.mannix@gmail.com] 
Sent: woensdag 30 juni 2010 17:04
To: user@mahout.apache.org
Subject: Re: DistributedLanczosSolver input

SequentialAccessSparseVector does not implement Writable.  You need a
Reducer<IntWritable,IntWritable,IntWritable,VectorWritable>.  In this
reducer,
you do everything you are currently doing, except that instead of writing
this
SequentialAccessSparseVector directly (via Context or
OutputCollector's write methods), you:

  SequentialAccessSparseVectorWritable v = // build it up
  // ...
  output.write(new VectorWritable(v));

This should fix your current problem.

Another thing you should be careful of is that if your itemIds and userIds
are of type long, they need to be somehow turned into int.  The keys on
vector types currently in Mahout are restricted to 32 bits.

  -jake

On Wed, Jun 30, 2010 at 10:28 AM, Laszlo Dosa
<la...@fredhopper.com>wrote:

> Hi,
>
> I try to run the
> org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver.
> My input look like (userid, itemid) as follows:
> ...
> 122641863,5060057723326
> 123441107,9789020282948
> ...
>
> How can I transform my input to the format that DistributedLanczosSolver
> needs (rows = users, columns=items, elements=number of items/user)?
>
> I tried to write a MapReduce Job  with Mapper<Object, Text, IntWritable,
> IntWritable>
> that maps the row to userid as key and itemid as value
> and a
> Reducer<IntWritable,IntWritable,IntWritable,SequentialAccessSparseVector>
> that instantiates a SequentialAccessSparseVector with itemid as key and
> itemid as index and sum(itemid) as value.
>
> I am getting this exception with the attached code:
>
> 2010-06-29 09:04:59,172 WARN org.apache.hadoop.mapred.TaskTracker: Error
> running child
> java.lang.NullPointerException
>        at
>
>
org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(Serializa
> tionFactory.java:73)
>        at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:759)
>        at
>
>
org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:487)
>        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:575)
>        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
>        at org.apache.hadoop.mapred.Child.main(Child.java:170)
>
>
> Can you suggest any other way?
>
> Regards,
> Laszlo
>
>
>
> import java.io.IOException;
>
> import java.util.HashMap;
>
> import java.util.Map;
>
>
>
> import org.apache.hadoop.conf.Configuration;
>
> import org.apache.hadoop.fs.Path;
>
> import org.apache.hadoop.io.IntWritable;
>
> import org.apache.hadoop.io.Text;
>
> import org.apache.hadoop.mapreduce.Job;
>
> import org.apache.hadoop.mapreduce.Mapper;
>
> import org.apache.hadoop.mapreduce.Reducer;
>
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>
> import org.apache.hadoop.util.GenericOptionsParser;
>
> import org.apache.mahout.math.SequentialAccessSparseVector;
>
>
>
> public class UserItemMatrix {
>
>        public static class TokenizerMapper  extends Mapper<Object, Text,
> IntWritable, IntWritable>{
>
>               private final static String DELIMITER = ",";
>
>
>
>               public void map(Object key, Text value, Context context)
> throws IOException, InterruptedException {
>
>                       String[] values = DELIMITER.split(value.toString());
>
>                       IntWritable userId = new
> IntWritable(Integer.parseInt(values[0]));
>
>                       IntWritable itemId = new
> IntWritable(Integer.parseInt(values[1]));
>
>                       context.write(userId, itemId);
>
>               }
>
>        }
>
>
>
>        public static class ItemReducer extends
> Reducer<IntWritable,IntWritable,IntWritable,SequentialAccessSparseVector>
{
>
>
>
>               private SequentialAccessSparseVector vector = new
> SequentialAccessSparseVector();
>
>
>
>               public void reduce(IntWritable key, Iterable<IntWritable>
> values, Context context) throws IOException, InterruptedException {
>
>                       Map<Integer,Integer> cooccurence = new
> HashMap<Integer,Integer>();
>
>                       for (IntWritable val : values) {
>
>                               int num = cooccurence.get(val.get());
>
>                               num++;
>
>                               cooccurence.put(val.get(), num);
>
>                       }
>
>                       for(Map.Entry<Integer, Integer> entry :
> cooccurence.entrySet()) {
>
>                               vector.set(entry.getKey(),
entry.getValue());
>
>                       }
>
>                       context.write(key, vector);
>
>               }
>
>        }
>
>
>
>        public static void main(String[] args) throws Exception {
>
>               Configuration conf = new Configuration();
>
>               String[] otherArgs = new GenericOptionsParser(conf,
> args).getRemainingArgs();
>
>               if (otherArgs.length != 2) {
>
>                       System.err.println("Usage: User Item cooccurence
> matrix <in> <out>");
>
>                       System.exit(2);
>
>               }
>
>
>
>               Job job = new Job(conf, "User Item cooccurence matrix");
>
>               job.setJarByClass(UserItemMatrix.class);
>
>               job.setMapperClass(TokenizerMapper.class);
>
>               job.setCombinerClass(ItemReducer.class);
>
>               job.setReducerClass(ItemReducer.class);
>
>               job.setOutputKeyClass(IntWritable.class);
>
>               job.setOutputValueClass(SequentialAccessSparseVector.class);
>
>               FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
>
>               FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
>
>               System.exit(job.waitForCompletion(true) ? 0 : 1);
>
>        }
>
> }
>
>
>
>