You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Saptarshi Guha <sa...@gmail.com> on 2009/01/18 02:06:35 UTC

Sorting on several columns using KeyFieldSeparator and Paritioner

Hello,
I have  a file with n columns, some which are text and some numeric.
Given a sequence of indices, i would like to sort on those indices i.e
first on Index1, then within Index2 and so on.
In the example code below, i have 3 columns, numeric, text, numeric,
space separated.
Sort on 2(reverse), then 1(reverse,numeric) and lastly 3

Though my code runs (and gives wrong results,col 2 is sorted in
reverse, and within that col3 which is treated as tex and then col1 )
on the local, when distributed I get a merge error - my guess is
fixing the latter fixes the former.

This is the error:
java.io.IOException: Final merge failed
        at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2093)
        at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.access$400(ReduceTask.java:457)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
        at org.apache.hadoop.mapred.Child.main(Child.java:155)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 562
        at org.apache.hadoop.io.WritableComparator.compareBytes(WritableComparator.java:128)
        at org.apache.hadoop.mapred.lib.KeyFieldBasedComparator.compareByteSequence(KeyFieldBasedComparator.java:109)
        at org.apache.hadoop.mapred.lib.KeyFieldBasedComparator.compare(KeyFieldBasedComparator.java:85)
        at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:308)
        at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:144)
        at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103)
        at org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.java:270)
        at org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:285)
        at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:108)
        at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2087)
        ... 3 more


Thanks for your time

And the code (not too big) is
==CODE==

public class RMRSort extends Configured implements Tool {

  static class RMRSortMap extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {

    public void map(LongWritable key, Text value,OutputCollector<Text,
Text> output, Reporter reporter)  throws IOException {
	output.collect(value,value);
    }
  }

    static class RMRSortReduce extends MapReduceBase implements
Reducer<Text, Text, NullWritable, Text> {

	public void reduce(Text key, Iterator<Text>
values,OutputCollector<NullWritable, Text> output, Reporter reporter)
throws IOException {
	    NullWritable n = NullWritable.get();
	    while(values.hasNext())
		    output.collect(n,values.next() );
	}
    }


    static JobConf createConf(String rserveport,String uid,String
infolder, String outfolder)
	Configuration defaults = new Configuration();
	JobConf jobConf = new JobConf(defaults, RMRSort.class);
	jobConf.setJobName("Sorter: "+uid);
	jobConf.addResource(new
Path(System.getenv("HADOOP_CONF_DIR")+"/hadoop-site.xml"));
// 	jobConf.set("mapred.job.tracker", "local");
	jobConf.setMapperClass(RMRSortMap.class);
	jobConf.setReducerClass(RMRSortReduce.class);
	jobConf.set("map.output.key.field.separator",fsep);
	jobConf.setPartitionerClass(KeyFieldBasedPartitioner.class);
	jobConf.set("mapred.text.key.partitioner.options","-k2,2 -k1,1 -k3,3");
	jobConf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
	jobConf.set("mapred.text.key.comparator.options","-k2r,2r -k1rn,1rn -k3n,3n");
//infolder, outfolder information removed
	jobConf.setMapOutputKeyClass(Text.class);
	jobConf.setMapOutputValueClass(Text.class);
	jobConf.setOutputKeyClass(NullWritable.class);
	return(jobConf);
    }
    public int run(String[] args) throws Exception {
	return(1);
    }

}




-- 
Saptarshi Guha - saptarshi.guha@gmail.com

Re: Sorting on several columns using KeyFieldSeparator and Paritioner

Posted by jason hadoop <ja...@gmail.com>.
You must only have 3 fields in your keys.
Try this - it is my best guess based on your code. Appendix A of my book has
a detailed discussion of these fields and the gotchas, and the example code
has test classes that allow you to try different keys with different input
to see how the parts are actually broken out of the keys.

jobConf.set("mapred.text.key.partitioner.options","-k2.2 -k1.1 -k3.3");
jobConf.set("mapred.text.key.comparator.options","-k2.2r -k1.1rn -k3.3n");


On Tue, May 5, 2009 at 2:05 AM, Min Zhou <co...@gmail.com> wrote:

> I came across the same failure.  Anyone can solve this problem?
>
> On Sun, Jan 18, 2009 at 9:06 AM, Saptarshi Guha <saptarshi.guha@gmail.com
> >wrote:
>
> > Hello,
> > I have  a file with n columns, some which are text and some numeric.
> > Given a sequence of indices, i would like to sort on those indices i.e
> > first on Index1, then within Index2 and so on.
> > In the example code below, i have 3 columns, numeric, text, numeric,
> > space separated.
> > Sort on 2(reverse), then 1(reverse,numeric) and lastly 3
> >
> > Though my code runs (and gives wrong results,col 2 is sorted in
> > reverse, and within that col3 which is treated as tex and then col1 )
> > on the local, when distributed I get a merge error - my guess is
> > fixing the latter fixes the former.
> >
> > This is the error:
> > java.io.IOException: Final merge failed
> >        at
> >
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2093)
> >        at
> >
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier.access$400(ReduceTask.java:457)
> >        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
> >        at org.apache.hadoop.mapred.Child.main(Child.java:155)
> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 562
> >        at
> >
> org.apache.hadoop.io.WritableComparator.compareBytes(WritableComparator.java:128)
> >        at
> >
> org.apache.hadoop.mapred.lib.KeyFieldBasedComparator.compareByteSequence(KeyFieldBasedComparator.java:109)
> >        at
> >
> org.apache.hadoop.mapred.lib.KeyFieldBasedComparator.compare(KeyFieldBasedComparator.java:85)
> >        at
> > org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:308)
> >        at
> > org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:144)
> >        at
> > org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103)
> >        at
> >
> org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.java:270)
> >        at
> org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:285)
> >        at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:108)
> >        at
> >
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2087)
> >        ... 3 more
> >
> >
> > Thanks for your time
> >
> > And the code (not too big) is
> > ==CODE==
> >
> > public class RMRSort extends Configured implements Tool {
> >
> >  static class RMRSortMap extends MapReduceBase implements
> > Mapper<LongWritable, Text, Text, Text> {
> >
> >    public void map(LongWritable key, Text value,OutputCollector<Text,
> > Text> output, Reporter reporter)  throws IOException {
> >        output.collect(value,value);
> >    }
> >  }
> >
> >    static class RMRSortReduce extends MapReduceBase implements
> > Reducer<Text, Text, NullWritable, Text> {
> >
> >        public void reduce(Text key, Iterator<Text>
> > values,OutputCollector<NullWritable, Text> output, Reporter reporter)
> > throws IOException {
> >            NullWritable n = NullWritable.get();
> >            while(values.hasNext())
> >                    output.collect(n,values.next() );
> >        }
> >    }
> >
> >
> >    static JobConf createConf(String rserveport,String uid,String
> > infolder, String outfolder)
> >        Configuration defaults = new Configuration();
> >        JobConf jobConf = new JobConf(defaults, RMRSort.class);
> >        jobConf.setJobName("Sorter: "+uid);
> >        jobConf.addResource(new
> > Path(System.getenv("HADOOP_CONF_DIR")+"/hadoop-site.xml"));
> > //      jobConf.set("mapred.job.tracker", "local");
> >        jobConf.setMapperClass(RMRSortMap.class);
> >        jobConf.setReducerClass(RMRSortReduce.class);
> >        jobConf.set("map.output.key.field.separator",fsep);
> >        jobConf.setPartitionerClass(KeyFieldBasedPartitioner.class);
> >        jobConf.set("mapred.text.key.partitioner.options","-k2,2 -k1,1
> > -k3,3");
> >
>  jobConf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
> >        jobConf.set("mapred.text.key.comparator.options","-k2r,2r
> -k1rn,1rn
> > -k3n,3n");
> > //infolder, outfolder information removed
> >        jobConf.setMapOutputKeyClass(Text.class);
> >        jobConf.setMapOutputValueClass(Text.class);
> >        jobConf.setOutputKeyClass(NullWritable.class);
> >        return(jobConf);
> >    }
> >    public int run(String[] args) throws Exception {
> >        return(1);
> >    }
> >
> > }
> >
> >
> >
> >
> > --
> > Saptarshi Guha - saptarshi.guha@gmail.com
> >
>
>
>
> --
> My research interests are distributed systems, parallel computing and
> bytecode based virtual machine.
>
> My profile:
> http://www.linkedin.com/in/coderplay
> My blog:
> http://coderplay.javaeye.com
>



-- 
Alpha Chapters of my book on Hadoop are available
http://www.apress.com/book/view/9781430219422
www.prohadoopbook.com a community for Hadoop Professionals

Re: Sorting on several columns using KeyFieldSeparator and Paritioner

Posted by Min Zhou <co...@gmail.com>.
I came across the same failure.  Anyone can solve this problem?

On Sun, Jan 18, 2009 at 9:06 AM, Saptarshi Guha <sa...@gmail.com>wrote:

> Hello,
> I have  a file with n columns, some which are text and some numeric.
> Given a sequence of indices, i would like to sort on those indices i.e
> first on Index1, then within Index2 and so on.
> In the example code below, i have 3 columns, numeric, text, numeric,
> space separated.
> Sort on 2(reverse), then 1(reverse,numeric) and lastly 3
>
> Though my code runs (and gives wrong results,col 2 is sorted in
> reverse, and within that col3 which is treated as tex and then col1 )
> on the local, when distributed I get a merge error - my guess is
> fixing the latter fixes the former.
>
> This is the error:
> java.io.IOException: Final merge failed
>        at
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2093)
>        at
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier.access$400(ReduceTask.java:457)
>        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
>        at org.apache.hadoop.mapred.Child.main(Child.java:155)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 562
>        at
> org.apache.hadoop.io.WritableComparator.compareBytes(WritableComparator.java:128)
>        at
> org.apache.hadoop.mapred.lib.KeyFieldBasedComparator.compareByteSequence(KeyFieldBasedComparator.java:109)
>        at
> org.apache.hadoop.mapred.lib.KeyFieldBasedComparator.compare(KeyFieldBasedComparator.java:85)
>        at
> org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:308)
>        at
> org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:144)
>        at
> org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103)
>        at
> org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.java:270)
>        at org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:285)
>        at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:108)
>        at
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2087)
>        ... 3 more
>
>
> Thanks for your time
>
> And the code (not too big) is
> ==CODE==
>
> public class RMRSort extends Configured implements Tool {
>
>  static class RMRSortMap extends MapReduceBase implements
> Mapper<LongWritable, Text, Text, Text> {
>
>    public void map(LongWritable key, Text value,OutputCollector<Text,
> Text> output, Reporter reporter)  throws IOException {
>        output.collect(value,value);
>    }
>  }
>
>    static class RMRSortReduce extends MapReduceBase implements
> Reducer<Text, Text, NullWritable, Text> {
>
>        public void reduce(Text key, Iterator<Text>
> values,OutputCollector<NullWritable, Text> output, Reporter reporter)
> throws IOException {
>            NullWritable n = NullWritable.get();
>            while(values.hasNext())
>                    output.collect(n,values.next() );
>        }
>    }
>
>
>    static JobConf createConf(String rserveport,String uid,String
> infolder, String outfolder)
>        Configuration defaults = new Configuration();
>        JobConf jobConf = new JobConf(defaults, RMRSort.class);
>        jobConf.setJobName("Sorter: "+uid);
>        jobConf.addResource(new
> Path(System.getenv("HADOOP_CONF_DIR")+"/hadoop-site.xml"));
> //      jobConf.set("mapred.job.tracker", "local");
>        jobConf.setMapperClass(RMRSortMap.class);
>        jobConf.setReducerClass(RMRSortReduce.class);
>        jobConf.set("map.output.key.field.separator",fsep);
>        jobConf.setPartitionerClass(KeyFieldBasedPartitioner.class);
>        jobConf.set("mapred.text.key.partitioner.options","-k2,2 -k1,1
> -k3,3");
>        jobConf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
>        jobConf.set("mapred.text.key.comparator.options","-k2r,2r -k1rn,1rn
> -k3n,3n");
> //infolder, outfolder information removed
>        jobConf.setMapOutputKeyClass(Text.class);
>        jobConf.setMapOutputValueClass(Text.class);
>        jobConf.setOutputKeyClass(NullWritable.class);
>        return(jobConf);
>    }
>    public int run(String[] args) throws Exception {
>        return(1);
>    }
>
> }
>
>
>
>
> --
> Saptarshi Guha - saptarshi.guha@gmail.com
>



-- 
My research interests are distributed systems, parallel computing and
bytecode based virtual machine.

My profile:
http://www.linkedin.com/in/coderplay
My blog:
http://coderplay.javaeye.com