You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Piyush Kansal <pi...@gmail.com> on 2012/02/20 03:08:33 UTC

Query regarding Hadoop Partitioning

Hi Friends,

I have to sort huge amount of data in minimum possible time probably using
partitioning. The key is composed of 3 fields(partition, text and number).
This is how partition is defined:

   - Partition "1" for range 1-10
   - Partition "2" for range 11-20
   - Partition "3" for range 21-30

*I/P file format*: partition[tab]text[tab]range-start[tab]range-end

[cloudera@localhost kMer2]$ cat input1

   - 1 chr1 1 10
   - 1 chr1 2 8
   - 2 chr1 11 18

[cloudera@localhost kMer2]$ cat input2

   - 1 chr1 3 7
   - 2 chr1 12 19

[cloudera@localhost kMer2]$ cat input3

   - 3 chr1 22 30

[cloudera@localhost kMer2]$ cat input4

   - 3 chr1 22 30
   - 1 chr1 9 10
   - 2 chr1 15 16

Then I ran following command:

hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar
\
-D stream.map.output.field.separator='\t' \
-D stream.num.map.output.key.fields=3 \
-D map.output.key.field.separator='\t' \
-D mapred.text.key.partitioner.options=-k1 \
-D mapred.reduce.tasks=3 \
-input /usr/pkansal/kMer2/ip \
-output /usr/pkansal/kMer2/op \
-mapper /home/cloudera/kMer2/kMer2Map.py \
-file /home/cloudera/kMer2/kMer2Map.py \
-reducer /home/cloudera/kMer2/kMer2Red.py \
-file /home/cloudera/kMer2/kMer2Red.py

Both mapper and reducer scripts just contain one line of code:

for line in sys.stdin:
    line = line.strip()
    print "%s" % (line)

Following is the o/p:

[cloudera@localhost kMer2]$ hadoop dfs -cat /usr/pkansal/kMer2/op/part-00000

   - 2 chr1 12 19
   - 2 chr1 15 16
   - 3 chr1 22 30
   - 3 chr1 22 30

[cloudera@localhost kMer2]$ hadoop dfs -cat /usr/pkansal/kMer2/op/part-00001

   - 1 chr1 2 8
   - 1 chr1 3 7
   - 1 chr1 9 10
   - 2 chr1 11 18

[cloudera@localhost kMer2]$ hadoop dfs -cat /usr/pkansal/kMer2/op/part-00002

   - 1 chr1 1 10
   - 3 chr1 22 29

This is not the o/p which I expected. I expected all records with:

   - partition 1 in one single file eg part-m-00000
   - partition 2 in one single file eg part-m-00001
   - partition 3 in one single file eg part-m-00002

Can you please suggest if I am doing it in a right way?
-- 
Regards,
Piyush Kansal

Re: Query regarding Hadoop Partitioning

Posted by Piyush Kansal <pi...@gmail.com>.
Thanks. It worked. It might be annoying to you but I quite new to Java.

On Fri, Feb 24, 2012 at 4:14 PM, Joey Echeverria <jo...@cloudera.com> wrote:

> It looks like your partitioner is an inner class. Try making it static:
>
> public static class MOPartition extends Partitioner<Text, Text>
>        public MOPartition() {}
>
> On Fri, Feb 24, 2012 at 3:48 PM, Piyush Kansal <pi...@gmail.com>
> wrote:
> > Hi,
> >
> > I am right now stuck with an issue while extending the Partitioner class:
> >
> > public class MOPartition extends Partitioner<Text, Text>
> >         public MOPartition() {}
> >
> > java.lang.RuntimeException: java.lang.NoSuchMethodException:
> > globalSort$MOPartition.<init>()
> >
> > I tried defining a empty constructor but still it didnt help. My JRE
> version
> > is 1.6.0.26.
> >
> > Can you please suggest what can be the issue?
> >
> >
> > On Mon, Feb 20, 2012 at 4:12 AM, Piyush Kansal <pi...@gmail.com>
> > wrote:
> >>
> >> Thanks Harsh. I will try it and will get back to you.
> >>
> >>
> >> On Mon, Feb 20, 2012 at 3:55 AM, Harsh J <ha...@cloudera.com> wrote:
> >>>
> >>> I do not think you can do it out of the box with streaming, but
> >>> last.fm's Dumbo (highly recommended if you use Python M/R) and its
> >>> add-on Feathers libraries can do it apparently.
> >>>
> >>> See Erik Forsberg's detailed answer (second) on
> >>>
> >>>
> http://stackoverflow.com/questions/1626786/generating-separate-output-files-in-hadoop-streaming
> >>> for more.
> >>>
> >>> On Mon, Feb 20, 2012 at 1:57 PM, Piyush Kansal <
> piyush.kansal@gmail.com>
> >>> wrote:
> >>> > Thanks for the immediate reply Harsh. I will try using it.
> >>> >
> >>> > By the way, cant we achieve the same goal with Hadoop Streaming
> (using
> >>> > Python)?
> >>> >
> >>> >
> >>> > On Mon, Feb 20, 2012 at 2:59 AM, Harsh J <ha...@cloudera.com> wrote:
> >>> >>
> >>> >> Piyush,
> >>> >>
> >>> >> Yes. Currently the partitioned data is always sorted by (and then
> >>> >> grouped by) keys before the reduce() calls begin.
> >>> >>
> >>> >> On Mon, Feb 20, 2012 at 12:51 PM, Piyush Kansal
> >>> >> <pi...@gmail.com>
> >>> >> wrote:
> >>> >> > Thanks Harsh.
> >>> >> >
> >>> >> > But will it also sort the data as Partitioner does.
> >>> >> >
> >>> >> >
> >>> >> > On Sun, Feb 19, 2012 at 10:54 PM, Harsh J <ha...@cloudera.com>
> >>> >> > wrote:
> >>> >> >>
> >>> >> >> Hi,
> >>> >> >>
> >>> >> >> You would find it easier to use the Java API's MultipleOutputs
> >>> >> >> (and/or
> >>> >> >> MultipleOutputFormat, which directly works on a configured key
> >>> >> >> field),
> >>> >> >> to write each key-partition out in its own file.
> >>> >> >>
> >>> >> >> On Mon, Feb 20, 2012 at 7:38 AM, Piyush Kansal
> >>> >> >> <pi...@gmail.com>
> >>> >> >> wrote:
> >>> >> >> > Hi Friends,
> >>> >> >> >
> >>> >> >> > I have to sort huge amount of data in minimum possible time
> >>> >> >> > probably
> >>> >> >> > using
> >>> >> >> > partitioning. The key is composed of 3 fields(partition, text
> and
> >>> >> >> > number).
> >>> >> >> > This is how partition is defined:
> >>> >> >> >
> >>> >> >> > Partition "1" for range 1-10
> >>> >> >> > Partition "2" for range 11-20
> >>> >> >> > Partition "3" for range 21-30
> >>> >> >> >
> >>> >> >> > I/P file format:
> partition[tab]text[tab]range-start[tab]range-end
> >>> >> >> >
> >>> >> >> > [cloudera@localhost kMer2]$ cat input1
> >>> >> >> >
> >>> >> >> > 1 chr1 1 10
> >>> >> >> > 1 chr1 2 8
> >>> >> >> > 2 chr1 11 18
> >>> >> >> >
> >>> >> >> > [cloudera@localhost kMer2]$ cat input2
> >>> >> >> >
> >>> >> >> > 1 chr1 3 7
> >>> >> >> > 2 chr1 12 19
> >>> >> >> >
> >>> >> >> > [cloudera@localhost kMer2]$ cat input3
> >>> >> >> >
> >>> >> >> > 3 chr1 22 30
> >>> >> >> >
> >>> >> >> > [cloudera@localhost kMer2]$ cat input4
> >>> >> >> >
> >>> >> >> > 3 chr1 22 30
> >>> >> >> > 1 chr1 9 10
> >>> >> >> > 2 chr1 15 16
> >>> >> >> >
> >>> >> >> > Then I ran following command:
> >>> >> >> >
> >>> >> >> > hadoop jar
> >>> >> >> >
> >>> >> >> >
> /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar
> >>> >> >> > \
> >>> >> >> > -D stream.map.output.field.separator='\t' \
> >>> >> >> > -D stream.num.map.output.key.fields=3 \
> >>> >> >> > -D map.output.key.field.separator='\t' \
> >>> >> >> > -D mapred.text.key.partitioner.options=-k1 \
> >>> >> >> > -D mapred.reduce.tasks=3 \
> >>> >> >> > -input /usr/pkansal/kMer2/ip \
> >>> >> >> > -output /usr/pkansal/kMer2/op \
> >>> >> >> > -mapper /home/cloudera/kMer2/kMer2Map.py \
> >>> >> >> > -file /home/cloudera/kMer2/kMer2Map.py \
> >>> >> >> > -reducer /home/cloudera/kMer2/kMer2Red.py \
> >>> >> >> > -file /home/cloudera/kMer2/kMer2Red.py
> >>> >> >> >
> >>> >> >> > Both mapper and reducer scripts just contain one line of code:
> >>> >> >> >
> >>> >> >> > for line in sys.stdin:
> >>> >> >> >     line = line.strip()
> >>> >> >> >     print "%s" % (line)
> >>> >> >> >
> >>> >> >> > Following is the o/p:
> >>> >> >> >
> >>> >> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
> >>> >> >> > /usr/pkansal/kMer2/op/part-00000
> >>> >> >> >
> >>> >> >> > 2 chr1 12 19
> >>> >> >> > 2 chr1 15 16
> >>> >> >> > 3 chr1 22 30
> >>> >> >> > 3 chr1 22 30
> >>> >> >> >
> >>> >> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
> >>> >> >> > /usr/pkansal/kMer2/op/part-00001
> >>> >> >> >
> >>> >> >> > 1 chr1 2 8
> >>> >> >> > 1 chr1 3 7
> >>> >> >> > 1 chr1 9 10
> >>> >> >> > 2 chr1 11 18
> >>> >> >> >
> >>> >> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
> >>> >> >> > /usr/pkansal/kMer2/op/part-00002
> >>> >> >> >
> >>> >> >> > 1 chr1 1 10
> >>> >> >> > 3 chr1 22 29
> >>> >> >> >
> >>> >> >> > This is not the o/p which I expected. I expected all records
> >>> >> >> > with:
> >>> >> >> >
> >>> >> >> > partition 1 in one single file eg part-m-00000
> >>> >> >> > partition 2 in one single file eg part-m-00001
> >>> >> >> > partition 3 in one single file eg part-m-00002
> >>> >> >> >
> >>> >> >> > Can you please suggest if I am doing it in a right way?
> >>> >> >> >
> >>> >> >> > --
> >>> >> >> > Regards,
> >>> >> >> > Piyush Kansal
> >>> >> >> >
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >> --
> >>> >> >> Harsh J
> >>> >> >> Customer Ops. Engineer
> >>> >> >> Cloudera | http://tiny.cloudera.com/about
> >>> >> >
> >>> >> >
> >>> >> >
> >>> >> >
> >>> >> > --
> >>> >> > Regards,
> >>> >> > Piyush Kansal
> >>> >> >
> >>> >>
> >>> >>
> >>> >>
> >>> >> --
> >>> >> Harsh J
> >>> >> Customer Ops. Engineer
> >>> >> Cloudera | http://tiny.cloudera.com/about
> >>> >
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > Regards,
> >>> > Piyush Kansal
> >>> >
> >>>
> >>>
> >>>
> >>> --
> >>> Harsh J
> >>> Customer Ops. Engineer
> >>> Cloudera | http://tiny.cloudera.com/about
> >>
> >>
> >>
> >>
> >> --
> >> Regards,
> >> Piyush Kansal
> >>
> >
> >
> >
> > --
> > Regards,
> > Piyush Kansal
> >
>
>
>
> --
> Joseph Echeverria
> Cloudera, Inc.
> 443.305.9434
>



-- 
Regards,
Piyush Kansal

Re: Query regarding Hadoop Partitioning

Posted by Joey Echeverria <jo...@cloudera.com>.
It looks like your partitioner is an inner class. Try making it static:

public static class MOPartition extends Partitioner<Text, Text>
        public MOPartition() {}

On Fri, Feb 24, 2012 at 3:48 PM, Piyush Kansal <pi...@gmail.com> wrote:
> Hi,
>
> I am right now stuck with an issue while extending the Partitioner class:
>
> public class MOPartition extends Partitioner<Text, Text>
>         public MOPartition() {}
>
> java.lang.RuntimeException: java.lang.NoSuchMethodException:
> globalSort$MOPartition.<init>()
>
> I tried defining a empty constructor but still it didnt help. My JRE version
> is 1.6.0.26.
>
> Can you please suggest what can be the issue?
>
>
> On Mon, Feb 20, 2012 at 4:12 AM, Piyush Kansal <pi...@gmail.com>
> wrote:
>>
>> Thanks Harsh. I will try it and will get back to you.
>>
>>
>> On Mon, Feb 20, 2012 at 3:55 AM, Harsh J <ha...@cloudera.com> wrote:
>>>
>>> I do not think you can do it out of the box with streaming, but
>>> last.fm's Dumbo (highly recommended if you use Python M/R) and its
>>> add-on Feathers libraries can do it apparently.
>>>
>>> See Erik Forsberg's detailed answer (second) on
>>>
>>> http://stackoverflow.com/questions/1626786/generating-separate-output-files-in-hadoop-streaming
>>> for more.
>>>
>>> On Mon, Feb 20, 2012 at 1:57 PM, Piyush Kansal <pi...@gmail.com>
>>> wrote:
>>> > Thanks for the immediate reply Harsh. I will try using it.
>>> >
>>> > By the way, cant we achieve the same goal with Hadoop Streaming (using
>>> > Python)?
>>> >
>>> >
>>> > On Mon, Feb 20, 2012 at 2:59 AM, Harsh J <ha...@cloudera.com> wrote:
>>> >>
>>> >> Piyush,
>>> >>
>>> >> Yes. Currently the partitioned data is always sorted by (and then
>>> >> grouped by) keys before the reduce() calls begin.
>>> >>
>>> >> On Mon, Feb 20, 2012 at 12:51 PM, Piyush Kansal
>>> >> <pi...@gmail.com>
>>> >> wrote:
>>> >> > Thanks Harsh.
>>> >> >
>>> >> > But will it also sort the data as Partitioner does.
>>> >> >
>>> >> >
>>> >> > On Sun, Feb 19, 2012 at 10:54 PM, Harsh J <ha...@cloudera.com>
>>> >> > wrote:
>>> >> >>
>>> >> >> Hi,
>>> >> >>
>>> >> >> You would find it easier to use the Java API's MultipleOutputs
>>> >> >> (and/or
>>> >> >> MultipleOutputFormat, which directly works on a configured key
>>> >> >> field),
>>> >> >> to write each key-partition out in its own file.
>>> >> >>
>>> >> >> On Mon, Feb 20, 2012 at 7:38 AM, Piyush Kansal
>>> >> >> <pi...@gmail.com>
>>> >> >> wrote:
>>> >> >> > Hi Friends,
>>> >> >> >
>>> >> >> > I have to sort huge amount of data in minimum possible time
>>> >> >> > probably
>>> >> >> > using
>>> >> >> > partitioning. The key is composed of 3 fields(partition, text and
>>> >> >> > number).
>>> >> >> > This is how partition is defined:
>>> >> >> >
>>> >> >> > Partition "1" for range 1-10
>>> >> >> > Partition "2" for range 11-20
>>> >> >> > Partition "3" for range 21-30
>>> >> >> >
>>> >> >> > I/P file format: partition[tab]text[tab]range-start[tab]range-end
>>> >> >> >
>>> >> >> > [cloudera@localhost kMer2]$ cat input1
>>> >> >> >
>>> >> >> > 1 chr1 1 10
>>> >> >> > 1 chr1 2 8
>>> >> >> > 2 chr1 11 18
>>> >> >> >
>>> >> >> > [cloudera@localhost kMer2]$ cat input2
>>> >> >> >
>>> >> >> > 1 chr1 3 7
>>> >> >> > 2 chr1 12 19
>>> >> >> >
>>> >> >> > [cloudera@localhost kMer2]$ cat input3
>>> >> >> >
>>> >> >> > 3 chr1 22 30
>>> >> >> >
>>> >> >> > [cloudera@localhost kMer2]$ cat input4
>>> >> >> >
>>> >> >> > 3 chr1 22 30
>>> >> >> > 1 chr1 9 10
>>> >> >> > 2 chr1 15 16
>>> >> >> >
>>> >> >> > Then I ran following command:
>>> >> >> >
>>> >> >> > hadoop jar
>>> >> >> >
>>> >> >> > /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar
>>> >> >> > \
>>> >> >> > -D stream.map.output.field.separator='\t' \
>>> >> >> > -D stream.num.map.output.key.fields=3 \
>>> >> >> > -D map.output.key.field.separator='\t' \
>>> >> >> > -D mapred.text.key.partitioner.options=-k1 \
>>> >> >> > -D mapred.reduce.tasks=3 \
>>> >> >> > -input /usr/pkansal/kMer2/ip \
>>> >> >> > -output /usr/pkansal/kMer2/op \
>>> >> >> > -mapper /home/cloudera/kMer2/kMer2Map.py \
>>> >> >> > -file /home/cloudera/kMer2/kMer2Map.py \
>>> >> >> > -reducer /home/cloudera/kMer2/kMer2Red.py \
>>> >> >> > -file /home/cloudera/kMer2/kMer2Red.py
>>> >> >> >
>>> >> >> > Both mapper and reducer scripts just contain one line of code:
>>> >> >> >
>>> >> >> > for line in sys.stdin:
>>> >> >> >     line = line.strip()
>>> >> >> >     print "%s" % (line)
>>> >> >> >
>>> >> >> > Following is the o/p:
>>> >> >> >
>>> >> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
>>> >> >> > /usr/pkansal/kMer2/op/part-00000
>>> >> >> >
>>> >> >> > 2 chr1 12 19
>>> >> >> > 2 chr1 15 16
>>> >> >> > 3 chr1 22 30
>>> >> >> > 3 chr1 22 30
>>> >> >> >
>>> >> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
>>> >> >> > /usr/pkansal/kMer2/op/part-00001
>>> >> >> >
>>> >> >> > 1 chr1 2 8
>>> >> >> > 1 chr1 3 7
>>> >> >> > 1 chr1 9 10
>>> >> >> > 2 chr1 11 18
>>> >> >> >
>>> >> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
>>> >> >> > /usr/pkansal/kMer2/op/part-00002
>>> >> >> >
>>> >> >> > 1 chr1 1 10
>>> >> >> > 3 chr1 22 29
>>> >> >> >
>>> >> >> > This is not the o/p which I expected. I expected all records
>>> >> >> > with:
>>> >> >> >
>>> >> >> > partition 1 in one single file eg part-m-00000
>>> >> >> > partition 2 in one single file eg part-m-00001
>>> >> >> > partition 3 in one single file eg part-m-00002
>>> >> >> >
>>> >> >> > Can you please suggest if I am doing it in a right way?
>>> >> >> >
>>> >> >> > --
>>> >> >> > Regards,
>>> >> >> > Piyush Kansal
>>> >> >> >
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >> --
>>> >> >> Harsh J
>>> >> >> Customer Ops. Engineer
>>> >> >> Cloudera | http://tiny.cloudera.com/about
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> > --
>>> >> > Regards,
>>> >> > Piyush Kansal
>>> >> >
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> Harsh J
>>> >> Customer Ops. Engineer
>>> >> Cloudera | http://tiny.cloudera.com/about
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > Regards,
>>> > Piyush Kansal
>>> >
>>>
>>>
>>>
>>> --
>>> Harsh J
>>> Customer Ops. Engineer
>>> Cloudera | http://tiny.cloudera.com/about
>>
>>
>>
>>
>> --
>> Regards,
>> Piyush Kansal
>>
>
>
>
> --
> Regards,
> Piyush Kansal
>



-- 
Joseph Echeverria
Cloudera, Inc.
443.305.9434

Re: Query regarding Hadoop Partitioning

Posted by Piyush Kansal <pi...@gmail.com>.
Hi,

I am right now stuck with an issue while extending the Partitioner class:

*public class MOPartition extends Partitioner<Text, Text>*
        *public MOPartition() {}*

java.lang.RuntimeException: java.lang.NoSuchMethodException:
globalSort$MOPartition.<init>()

I tried defining a empty constructor but still it didnt help. My JRE
version is 1.6.0.26.

Can you please suggest what can be the issue?

On Mon, Feb 20, 2012 at 4:12 AM, Piyush Kansal <pi...@gmail.com>wrote:

> Thanks Harsh. I will try it and will get back to you.
>
>
> On Mon, Feb 20, 2012 at 3:55 AM, Harsh J <ha...@cloudera.com> wrote:
>
>> I do not think you can do it out of the box with streaming, but
>> last.fm's Dumbo (highly recommended if you use Python M/R) and its
>> add-on Feathers libraries can do it apparently.
>>
>> See Erik Forsberg's detailed answer (second) on
>>
>> http://stackoverflow.com/questions/1626786/generating-separate-output-files-in-hadoop-streaming
>> for more.
>>
>> On Mon, Feb 20, 2012 at 1:57 PM, Piyush Kansal <pi...@gmail.com>
>> wrote:
>> > Thanks for the immediate reply Harsh. I will try using it.
>> >
>> > By the way, cant we achieve the same goal with Hadoop Streaming (using
>> > Python)?
>> >
>> >
>> > On Mon, Feb 20, 2012 at 2:59 AM, Harsh J <ha...@cloudera.com> wrote:
>> >>
>> >> Piyush,
>> >>
>> >> Yes. Currently the partitioned data is always sorted by (and then
>> >> grouped by) keys before the reduce() calls begin.
>> >>
>> >> On Mon, Feb 20, 2012 at 12:51 PM, Piyush Kansal <
>> piyush.kansal@gmail.com>
>> >> wrote:
>> >> > Thanks Harsh.
>> >> >
>> >> > But will it also sort the data as Partitioner does.
>> >> >
>> >> >
>> >> > On Sun, Feb 19, 2012 at 10:54 PM, Harsh J <ha...@cloudera.com>
>> wrote:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> You would find it easier to use the Java API's MultipleOutputs
>> (and/or
>> >> >> MultipleOutputFormat, which directly works on a configured key
>> field),
>> >> >> to write each key-partition out in its own file.
>> >> >>
>> >> >> On Mon, Feb 20, 2012 at 7:38 AM, Piyush Kansal
>> >> >> <pi...@gmail.com>
>> >> >> wrote:
>> >> >> > Hi Friends,
>> >> >> >
>> >> >> > I have to sort huge amount of data in minimum possible time
>> probably
>> >> >> > using
>> >> >> > partitioning. The key is composed of 3 fields(partition, text and
>> >> >> > number).
>> >> >> > This is how partition is defined:
>> >> >> >
>> >> >> > Partition "1" for range 1-10
>> >> >> > Partition "2" for range 11-20
>> >> >> > Partition "3" for range 21-30
>> >> >> >
>> >> >> > I/P file format: partition[tab]text[tab]range-start[tab]range-end
>> >> >> >
>> >> >> > [cloudera@localhost kMer2]$ cat input1
>> >> >> >
>> >> >> > 1 chr1 1 10
>> >> >> > 1 chr1 2 8
>> >> >> > 2 chr1 11 18
>> >> >> >
>> >> >> > [cloudera@localhost kMer2]$ cat input2
>> >> >> >
>> >> >> > 1 chr1 3 7
>> >> >> > 2 chr1 12 19
>> >> >> >
>> >> >> > [cloudera@localhost kMer2]$ cat input3
>> >> >> >
>> >> >> > 3 chr1 22 30
>> >> >> >
>> >> >> > [cloudera@localhost kMer2]$ cat input4
>> >> >> >
>> >> >> > 3 chr1 22 30
>> >> >> > 1 chr1 9 10
>> >> >> > 2 chr1 15 16
>> >> >> >
>> >> >> > Then I ran following command:
>> >> >> >
>> >> >> > hadoop jar
>> >> >> >
>> /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar
>> >> >> > \
>> >> >> > -D stream.map.output.field.separator='\t' \
>> >> >> > -D stream.num.map.output.key.fields=3 \
>> >> >> > -D map.output.key.field.separator='\t' \
>> >> >> > -D mapred.text.key.partitioner.options=-k1 \
>> >> >> > -D mapred.reduce.tasks=3 \
>> >> >> > -input /usr/pkansal/kMer2/ip \
>> >> >> > -output /usr/pkansal/kMer2/op \
>> >> >> > -mapper /home/cloudera/kMer2/kMer2Map.py \
>> >> >> > -file /home/cloudera/kMer2/kMer2Map.py \
>> >> >> > -reducer /home/cloudera/kMer2/kMer2Red.py \
>> >> >> > -file /home/cloudera/kMer2/kMer2Red.py
>> >> >> >
>> >> >> > Both mapper and reducer scripts just contain one line of code:
>> >> >> >
>> >> >> > for line in sys.stdin:
>> >> >> >     line = line.strip()
>> >> >> >     print "%s" % (line)
>> >> >> >
>> >> >> > Following is the o/p:
>> >> >> >
>> >> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
>> >> >> > /usr/pkansal/kMer2/op/part-00000
>> >> >> >
>> >> >> > 2 chr1 12 19
>> >> >> > 2 chr1 15 16
>> >> >> > 3 chr1 22 30
>> >> >> > 3 chr1 22 30
>> >> >> >
>> >> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
>> >> >> > /usr/pkansal/kMer2/op/part-00001
>> >> >> >
>> >> >> > 1 chr1 2 8
>> >> >> > 1 chr1 3 7
>> >> >> > 1 chr1 9 10
>> >> >> > 2 chr1 11 18
>> >> >> >
>> >> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
>> >> >> > /usr/pkansal/kMer2/op/part-00002
>> >> >> >
>> >> >> > 1 chr1 1 10
>> >> >> > 3 chr1 22 29
>> >> >> >
>> >> >> > This is not the o/p which I expected. I expected all records with:
>> >> >> >
>> >> >> > partition 1 in one single file eg part-m-00000
>> >> >> > partition 2 in one single file eg part-m-00001
>> >> >> > partition 3 in one single file eg part-m-00002
>> >> >> >
>> >> >> > Can you please suggest if I am doing it in a right way?
>> >> >> >
>> >> >> > --
>> >> >> > Regards,
>> >> >> > Piyush Kansal
>> >> >> >
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >> Harsh J
>> >> >> Customer Ops. Engineer
>> >> >> Cloudera | http://tiny.cloudera.com/about
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Regards,
>> >> > Piyush Kansal
>> >> >
>> >>
>> >>
>> >>
>> >> --
>> >> Harsh J
>> >> Customer Ops. Engineer
>> >> Cloudera | http://tiny.cloudera.com/about
>> >
>> >
>> >
>> >
>> > --
>> > Regards,
>> > Piyush Kansal
>> >
>>
>>
>>
>> --
>> Harsh J
>> Customer Ops. Engineer
>> Cloudera | http://tiny.cloudera.com/about
>>
>
>
>
> --
> Regards,
> Piyush Kansal
>
>


-- 
Regards,
Piyush Kansal

Re: Query regarding Hadoop Partitioning

Posted by Piyush Kansal <pi...@gmail.com>.
Thanks Harsh. I will try it and will get back to you.

On Mon, Feb 20, 2012 at 3:55 AM, Harsh J <ha...@cloudera.com> wrote:

> I do not think you can do it out of the box with streaming, but
> last.fm's Dumbo (highly recommended if you use Python M/R) and its
> add-on Feathers libraries can do it apparently.
>
> See Erik Forsberg's detailed answer (second) on
>
> http://stackoverflow.com/questions/1626786/generating-separate-output-files-in-hadoop-streaming
> for more.
>
> On Mon, Feb 20, 2012 at 1:57 PM, Piyush Kansal <pi...@gmail.com>
> wrote:
> > Thanks for the immediate reply Harsh. I will try using it.
> >
> > By the way, cant we achieve the same goal with Hadoop Streaming (using
> > Python)?
> >
> >
> > On Mon, Feb 20, 2012 at 2:59 AM, Harsh J <ha...@cloudera.com> wrote:
> >>
> >> Piyush,
> >>
> >> Yes. Currently the partitioned data is always sorted by (and then
> >> grouped by) keys before the reduce() calls begin.
> >>
> >> On Mon, Feb 20, 2012 at 12:51 PM, Piyush Kansal <
> piyush.kansal@gmail.com>
> >> wrote:
> >> > Thanks Harsh.
> >> >
> >> > But will it also sort the data as Partitioner does.
> >> >
> >> >
> >> > On Sun, Feb 19, 2012 at 10:54 PM, Harsh J <ha...@cloudera.com> wrote:
> >> >>
> >> >> Hi,
> >> >>
> >> >> You would find it easier to use the Java API's MultipleOutputs
> (and/or
> >> >> MultipleOutputFormat, which directly works on a configured key
> field),
> >> >> to write each key-partition out in its own file.
> >> >>
> >> >> On Mon, Feb 20, 2012 at 7:38 AM, Piyush Kansal
> >> >> <pi...@gmail.com>
> >> >> wrote:
> >> >> > Hi Friends,
> >> >> >
> >> >> > I have to sort huge amount of data in minimum possible time
> probably
> >> >> > using
> >> >> > partitioning. The key is composed of 3 fields(partition, text and
> >> >> > number).
> >> >> > This is how partition is defined:
> >> >> >
> >> >> > Partition "1" for range 1-10
> >> >> > Partition "2" for range 11-20
> >> >> > Partition "3" for range 21-30
> >> >> >
> >> >> > I/P file format: partition[tab]text[tab]range-start[tab]range-end
> >> >> >
> >> >> > [cloudera@localhost kMer2]$ cat input1
> >> >> >
> >> >> > 1 chr1 1 10
> >> >> > 1 chr1 2 8
> >> >> > 2 chr1 11 18
> >> >> >
> >> >> > [cloudera@localhost kMer2]$ cat input2
> >> >> >
> >> >> > 1 chr1 3 7
> >> >> > 2 chr1 12 19
> >> >> >
> >> >> > [cloudera@localhost kMer2]$ cat input3
> >> >> >
> >> >> > 3 chr1 22 30
> >> >> >
> >> >> > [cloudera@localhost kMer2]$ cat input4
> >> >> >
> >> >> > 3 chr1 22 30
> >> >> > 1 chr1 9 10
> >> >> > 2 chr1 15 16
> >> >> >
> >> >> > Then I ran following command:
> >> >> >
> >> >> > hadoop jar
> >> >> >
> /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar
> >> >> > \
> >> >> > -D stream.map.output.field.separator='\t' \
> >> >> > -D stream.num.map.output.key.fields=3 \
> >> >> > -D map.output.key.field.separator='\t' \
> >> >> > -D mapred.text.key.partitioner.options=-k1 \
> >> >> > -D mapred.reduce.tasks=3 \
> >> >> > -input /usr/pkansal/kMer2/ip \
> >> >> > -output /usr/pkansal/kMer2/op \
> >> >> > -mapper /home/cloudera/kMer2/kMer2Map.py \
> >> >> > -file /home/cloudera/kMer2/kMer2Map.py \
> >> >> > -reducer /home/cloudera/kMer2/kMer2Red.py \
> >> >> > -file /home/cloudera/kMer2/kMer2Red.py
> >> >> >
> >> >> > Both mapper and reducer scripts just contain one line of code:
> >> >> >
> >> >> > for line in sys.stdin:
> >> >> >     line = line.strip()
> >> >> >     print "%s" % (line)
> >> >> >
> >> >> > Following is the o/p:
> >> >> >
> >> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
> >> >> > /usr/pkansal/kMer2/op/part-00000
> >> >> >
> >> >> > 2 chr1 12 19
> >> >> > 2 chr1 15 16
> >> >> > 3 chr1 22 30
> >> >> > 3 chr1 22 30
> >> >> >
> >> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
> >> >> > /usr/pkansal/kMer2/op/part-00001
> >> >> >
> >> >> > 1 chr1 2 8
> >> >> > 1 chr1 3 7
> >> >> > 1 chr1 9 10
> >> >> > 2 chr1 11 18
> >> >> >
> >> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
> >> >> > /usr/pkansal/kMer2/op/part-00002
> >> >> >
> >> >> > 1 chr1 1 10
> >> >> > 3 chr1 22 29
> >> >> >
> >> >> > This is not the o/p which I expected. I expected all records with:
> >> >> >
> >> >> > partition 1 in one single file eg part-m-00000
> >> >> > partition 2 in one single file eg part-m-00001
> >> >> > partition 3 in one single file eg part-m-00002
> >> >> >
> >> >> > Can you please suggest if I am doing it in a right way?
> >> >> >
> >> >> > --
> >> >> > Regards,
> >> >> > Piyush Kansal
> >> >> >
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Harsh J
> >> >> Customer Ops. Engineer
> >> >> Cloudera | http://tiny.cloudera.com/about
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > Regards,
> >> > Piyush Kansal
> >> >
> >>
> >>
> >>
> >> --
> >> Harsh J
> >> Customer Ops. Engineer
> >> Cloudera | http://tiny.cloudera.com/about
> >
> >
> >
> >
> > --
> > Regards,
> > Piyush Kansal
> >
>
>
>
> --
> Harsh J
> Customer Ops. Engineer
> Cloudera | http://tiny.cloudera.com/about
>



-- 
Regards,
Piyush Kansal

Re: Query regarding Hadoop Partitioning

Posted by Harsh J <ha...@cloudera.com>.
I do not think you can do it out of the box with streaming, but
last.fm's Dumbo (highly recommended if you use Python M/R) and its
add-on Feathers libraries can do it apparently.

See Erik Forsberg's detailed answer (second) on
http://stackoverflow.com/questions/1626786/generating-separate-output-files-in-hadoop-streaming
for more.

On Mon, Feb 20, 2012 at 1:57 PM, Piyush Kansal <pi...@gmail.com> wrote:
> Thanks for the immediate reply Harsh. I will try using it.
>
> By the way, cant we achieve the same goal with Hadoop Streaming (using
> Python)?
>
>
> On Mon, Feb 20, 2012 at 2:59 AM, Harsh J <ha...@cloudera.com> wrote:
>>
>> Piyush,
>>
>> Yes. Currently the partitioned data is always sorted by (and then
>> grouped by) keys before the reduce() calls begin.
>>
>> On Mon, Feb 20, 2012 at 12:51 PM, Piyush Kansal <pi...@gmail.com>
>> wrote:
>> > Thanks Harsh.
>> >
>> > But will it also sort the data as Partitioner does.
>> >
>> >
>> > On Sun, Feb 19, 2012 at 10:54 PM, Harsh J <ha...@cloudera.com> wrote:
>> >>
>> >> Hi,
>> >>
>> >> You would find it easier to use the Java API's MultipleOutputs (and/or
>> >> MultipleOutputFormat, which directly works on a configured key field),
>> >> to write each key-partition out in its own file.
>> >>
>> >> On Mon, Feb 20, 2012 at 7:38 AM, Piyush Kansal
>> >> <pi...@gmail.com>
>> >> wrote:
>> >> > Hi Friends,
>> >> >
>> >> > I have to sort huge amount of data in minimum possible time probably
>> >> > using
>> >> > partitioning. The key is composed of 3 fields(partition, text and
>> >> > number).
>> >> > This is how partition is defined:
>> >> >
>> >> > Partition "1" for range 1-10
>> >> > Partition "2" for range 11-20
>> >> > Partition "3" for range 21-30
>> >> >
>> >> > I/P file format: partition[tab]text[tab]range-start[tab]range-end
>> >> >
>> >> > [cloudera@localhost kMer2]$ cat input1
>> >> >
>> >> > 1 chr1 1 10
>> >> > 1 chr1 2 8
>> >> > 2 chr1 11 18
>> >> >
>> >> > [cloudera@localhost kMer2]$ cat input2
>> >> >
>> >> > 1 chr1 3 7
>> >> > 2 chr1 12 19
>> >> >
>> >> > [cloudera@localhost kMer2]$ cat input3
>> >> >
>> >> > 3 chr1 22 30
>> >> >
>> >> > [cloudera@localhost kMer2]$ cat input4
>> >> >
>> >> > 3 chr1 22 30
>> >> > 1 chr1 9 10
>> >> > 2 chr1 15 16
>> >> >
>> >> > Then I ran following command:
>> >> >
>> >> > hadoop jar
>> >> > /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar
>> >> > \
>> >> > -D stream.map.output.field.separator='\t' \
>> >> > -D stream.num.map.output.key.fields=3 \
>> >> > -D map.output.key.field.separator='\t' \
>> >> > -D mapred.text.key.partitioner.options=-k1 \
>> >> > -D mapred.reduce.tasks=3 \
>> >> > -input /usr/pkansal/kMer2/ip \
>> >> > -output /usr/pkansal/kMer2/op \
>> >> > -mapper /home/cloudera/kMer2/kMer2Map.py \
>> >> > -file /home/cloudera/kMer2/kMer2Map.py \
>> >> > -reducer /home/cloudera/kMer2/kMer2Red.py \
>> >> > -file /home/cloudera/kMer2/kMer2Red.py
>> >> >
>> >> > Both mapper and reducer scripts just contain one line of code:
>> >> >
>> >> > for line in sys.stdin:
>> >> >     line = line.strip()
>> >> >     print "%s" % (line)
>> >> >
>> >> > Following is the o/p:
>> >> >
>> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
>> >> > /usr/pkansal/kMer2/op/part-00000
>> >> >
>> >> > 2 chr1 12 19
>> >> > 2 chr1 15 16
>> >> > 3 chr1 22 30
>> >> > 3 chr1 22 30
>> >> >
>> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
>> >> > /usr/pkansal/kMer2/op/part-00001
>> >> >
>> >> > 1 chr1 2 8
>> >> > 1 chr1 3 7
>> >> > 1 chr1 9 10
>> >> > 2 chr1 11 18
>> >> >
>> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
>> >> > /usr/pkansal/kMer2/op/part-00002
>> >> >
>> >> > 1 chr1 1 10
>> >> > 3 chr1 22 29
>> >> >
>> >> > This is not the o/p which I expected. I expected all records with:
>> >> >
>> >> > partition 1 in one single file eg part-m-00000
>> >> > partition 2 in one single file eg part-m-00001
>> >> > partition 3 in one single file eg part-m-00002
>> >> >
>> >> > Can you please suggest if I am doing it in a right way?
>> >> >
>> >> > --
>> >> > Regards,
>> >> > Piyush Kansal
>> >> >
>> >>
>> >>
>> >>
>> >> --
>> >> Harsh J
>> >> Customer Ops. Engineer
>> >> Cloudera | http://tiny.cloudera.com/about
>> >
>> >
>> >
>> >
>> > --
>> > Regards,
>> > Piyush Kansal
>> >
>>
>>
>>
>> --
>> Harsh J
>> Customer Ops. Engineer
>> Cloudera | http://tiny.cloudera.com/about
>
>
>
>
> --
> Regards,
> Piyush Kansal
>



-- 
Harsh J
Customer Ops. Engineer
Cloudera | http://tiny.cloudera.com/about

Re: Query regarding Hadoop Partitioning

Posted by Piyush Kansal <pi...@gmail.com>.
Thanks for the immediate reply Harsh. I will try using it.

By the way, cant we achieve the same goal with Hadoop Streaming (using
Python)?

On Mon, Feb 20, 2012 at 2:59 AM, Harsh J <ha...@cloudera.com> wrote:

> Piyush,
>
> Yes. Currently the partitioned data is always sorted by (and then
> grouped by) keys before the reduce() calls begin.
>
> On Mon, Feb 20, 2012 at 12:51 PM, Piyush Kansal <pi...@gmail.com>
> wrote:
> > Thanks Harsh.
> >
> > But will it also sort the data as Partitioner does.
> >
> >
> > On Sun, Feb 19, 2012 at 10:54 PM, Harsh J <ha...@cloudera.com> wrote:
> >>
> >> Hi,
> >>
> >> You would find it easier to use the Java API's MultipleOutputs (and/or
> >> MultipleOutputFormat, which directly works on a configured key field),
> >> to write each key-partition out in its own file.
> >>
> >> On Mon, Feb 20, 2012 at 7:38 AM, Piyush Kansal <piyush.kansal@gmail.com
> >
> >> wrote:
> >> > Hi Friends,
> >> >
> >> > I have to sort huge amount of data in minimum possible time probably
> >> > using
> >> > partitioning. The key is composed of 3 fields(partition, text and
> >> > number).
> >> > This is how partition is defined:
> >> >
> >> > Partition "1" for range 1-10
> >> > Partition "2" for range 11-20
> >> > Partition "3" for range 21-30
> >> >
> >> > I/P file format: partition[tab]text[tab]range-start[tab]range-end
> >> >
> >> > [cloudera@localhost kMer2]$ cat input1
> >> >
> >> > 1 chr1 1 10
> >> > 1 chr1 2 8
> >> > 2 chr1 11 18
> >> >
> >> > [cloudera@localhost kMer2]$ cat input2
> >> >
> >> > 1 chr1 3 7
> >> > 2 chr1 12 19
> >> >
> >> > [cloudera@localhost kMer2]$ cat input3
> >> >
> >> > 3 chr1 22 30
> >> >
> >> > [cloudera@localhost kMer2]$ cat input4
> >> >
> >> > 3 chr1 22 30
> >> > 1 chr1 9 10
> >> > 2 chr1 15 16
> >> >
> >> > Then I ran following command:
> >> >
> >> > hadoop jar
> >> > /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar \
> >> > -D stream.map.output.field.separator='\t' \
> >> > -D stream.num.map.output.key.fields=3 \
> >> > -D map.output.key.field.separator='\t' \
> >> > -D mapred.text.key.partitioner.options=-k1 \
> >> > -D mapred.reduce.tasks=3 \
> >> > -input /usr/pkansal/kMer2/ip \
> >> > -output /usr/pkansal/kMer2/op \
> >> > -mapper /home/cloudera/kMer2/kMer2Map.py \
> >> > -file /home/cloudera/kMer2/kMer2Map.py \
> >> > -reducer /home/cloudera/kMer2/kMer2Red.py \
> >> > -file /home/cloudera/kMer2/kMer2Red.py
> >> >
> >> > Both mapper and reducer scripts just contain one line of code:
> >> >
> >> > for line in sys.stdin:
> >> >     line = line.strip()
> >> >     print "%s" % (line)
> >> >
> >> > Following is the o/p:
> >> >
> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
> >> > /usr/pkansal/kMer2/op/part-00000
> >> >
> >> > 2 chr1 12 19
> >> > 2 chr1 15 16
> >> > 3 chr1 22 30
> >> > 3 chr1 22 30
> >> >
> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
> >> > /usr/pkansal/kMer2/op/part-00001
> >> >
> >> > 1 chr1 2 8
> >> > 1 chr1 3 7
> >> > 1 chr1 9 10
> >> > 2 chr1 11 18
> >> >
> >> > [cloudera@localhost kMer2]$ hadoop dfs -cat
> >> > /usr/pkansal/kMer2/op/part-00002
> >> >
> >> > 1 chr1 1 10
> >> > 3 chr1 22 29
> >> >
> >> > This is not the o/p which I expected. I expected all records with:
> >> >
> >> > partition 1 in one single file eg part-m-00000
> >> > partition 2 in one single file eg part-m-00001
> >> > partition 3 in one single file eg part-m-00002
> >> >
> >> > Can you please suggest if I am doing it in a right way?
> >> >
> >> > --
> >> > Regards,
> >> > Piyush Kansal
> >> >
> >>
> >>
> >>
> >> --
> >> Harsh J
> >> Customer Ops. Engineer
> >> Cloudera | http://tiny.cloudera.com/about
> >
> >
> >
> >
> > --
> > Regards,
> > Piyush Kansal
> >
>
>
>
> --
> Harsh J
> Customer Ops. Engineer
> Cloudera | http://tiny.cloudera.com/about
>



-- 
Regards,
Piyush Kansal

Re: Query regarding Hadoop Partitioning

Posted by Harsh J <ha...@cloudera.com>.
Piyush,

Yes. Currently the partitioned data is always sorted by (and then
grouped by) keys before the reduce() calls begin.

On Mon, Feb 20, 2012 at 12:51 PM, Piyush Kansal <pi...@gmail.com> wrote:
> Thanks Harsh.
>
> But will it also sort the data as Partitioner does.
>
>
> On Sun, Feb 19, 2012 at 10:54 PM, Harsh J <ha...@cloudera.com> wrote:
>>
>> Hi,
>>
>> You would find it easier to use the Java API's MultipleOutputs (and/or
>> MultipleOutputFormat, which directly works on a configured key field),
>> to write each key-partition out in its own file.
>>
>> On Mon, Feb 20, 2012 at 7:38 AM, Piyush Kansal <pi...@gmail.com>
>> wrote:
>> > Hi Friends,
>> >
>> > I have to sort huge amount of data in minimum possible time probably
>> > using
>> > partitioning. The key is composed of 3 fields(partition, text and
>> > number).
>> > This is how partition is defined:
>> >
>> > Partition "1" for range 1-10
>> > Partition "2" for range 11-20
>> > Partition "3" for range 21-30
>> >
>> > I/P file format: partition[tab]text[tab]range-start[tab]range-end
>> >
>> > [cloudera@localhost kMer2]$ cat input1
>> >
>> > 1 chr1 1 10
>> > 1 chr1 2 8
>> > 2 chr1 11 18
>> >
>> > [cloudera@localhost kMer2]$ cat input2
>> >
>> > 1 chr1 3 7
>> > 2 chr1 12 19
>> >
>> > [cloudera@localhost kMer2]$ cat input3
>> >
>> > 3 chr1 22 30
>> >
>> > [cloudera@localhost kMer2]$ cat input4
>> >
>> > 3 chr1 22 30
>> > 1 chr1 9 10
>> > 2 chr1 15 16
>> >
>> > Then I ran following command:
>> >
>> > hadoop jar
>> > /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar \
>> > -D stream.map.output.field.separator='\t' \
>> > -D stream.num.map.output.key.fields=3 \
>> > -D map.output.key.field.separator='\t' \
>> > -D mapred.text.key.partitioner.options=-k1 \
>> > -D mapred.reduce.tasks=3 \
>> > -input /usr/pkansal/kMer2/ip \
>> > -output /usr/pkansal/kMer2/op \
>> > -mapper /home/cloudera/kMer2/kMer2Map.py \
>> > -file /home/cloudera/kMer2/kMer2Map.py \
>> > -reducer /home/cloudera/kMer2/kMer2Red.py \
>> > -file /home/cloudera/kMer2/kMer2Red.py
>> >
>> > Both mapper and reducer scripts just contain one line of code:
>> >
>> > for line in sys.stdin:
>> >     line = line.strip()
>> >     print "%s" % (line)
>> >
>> > Following is the o/p:
>> >
>> > [cloudera@localhost kMer2]$ hadoop dfs -cat
>> > /usr/pkansal/kMer2/op/part-00000
>> >
>> > 2 chr1 12 19
>> > 2 chr1 15 16
>> > 3 chr1 22 30
>> > 3 chr1 22 30
>> >
>> > [cloudera@localhost kMer2]$ hadoop dfs -cat
>> > /usr/pkansal/kMer2/op/part-00001
>> >
>> > 1 chr1 2 8
>> > 1 chr1 3 7
>> > 1 chr1 9 10
>> > 2 chr1 11 18
>> >
>> > [cloudera@localhost kMer2]$ hadoop dfs -cat
>> > /usr/pkansal/kMer2/op/part-00002
>> >
>> > 1 chr1 1 10
>> > 3 chr1 22 29
>> >
>> > This is not the o/p which I expected. I expected all records with:
>> >
>> > partition 1 in one single file eg part-m-00000
>> > partition 2 in one single file eg part-m-00001
>> > partition 3 in one single file eg part-m-00002
>> >
>> > Can you please suggest if I am doing it in a right way?
>> >
>> > --
>> > Regards,
>> > Piyush Kansal
>> >
>>
>>
>>
>> --
>> Harsh J
>> Customer Ops. Engineer
>> Cloudera | http://tiny.cloudera.com/about
>
>
>
>
> --
> Regards,
> Piyush Kansal
>



-- 
Harsh J
Customer Ops. Engineer
Cloudera | http://tiny.cloudera.com/about

Re: Query regarding Hadoop Partitioning

Posted by Piyush Kansal <pi...@gmail.com>.
Thanks Harsh.

But will it also sort the data as Partitioner does.

On Sun, Feb 19, 2012 at 10:54 PM, Harsh J <ha...@cloudera.com> wrote:

> Hi,
>
> You would find it easier to use the Java API's MultipleOutputs (and/or
> MultipleOutputFormat, which directly works on a configured key field),
> to write each key-partition out in its own file.
>
> On Mon, Feb 20, 2012 at 7:38 AM, Piyush Kansal <pi...@gmail.com>
> wrote:
> > Hi Friends,
> >
> > I have to sort huge amount of data in minimum possible time probably
> using
> > partitioning. The key is composed of 3 fields(partition, text and
> number).
> > This is how partition is defined:
> >
> > Partition "1" for range 1-10
> > Partition "2" for range 11-20
> > Partition "3" for range 21-30
> >
> > I/P file format: partition[tab]text[tab]range-start[tab]range-end
> >
> > [cloudera@localhost kMer2]$ cat input1
> >
> > 1 chr1 1 10
> > 1 chr1 2 8
> > 2 chr1 11 18
> >
> > [cloudera@localhost kMer2]$ cat input2
> >
> > 1 chr1 3 7
> > 2 chr1 12 19
> >
> > [cloudera@localhost kMer2]$ cat input3
> >
> > 3 chr1 22 30
> >
> > [cloudera@localhost kMer2]$ cat input4
> >
> > 3 chr1 22 30
> > 1 chr1 9 10
> > 2 chr1 15 16
> >
> > Then I ran following command:
> >
> > hadoop jar
> > /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar \
> > -D stream.map.output.field.separator='\t' \
> > -D stream.num.map.output.key.fields=3 \
> > -D map.output.key.field.separator='\t' \
> > -D mapred.text.key.partitioner.options=-k1 \
> > -D mapred.reduce.tasks=3 \
> > -input /usr/pkansal/kMer2/ip \
> > -output /usr/pkansal/kMer2/op \
> > -mapper /home/cloudera/kMer2/kMer2Map.py \
> > -file /home/cloudera/kMer2/kMer2Map.py \
> > -reducer /home/cloudera/kMer2/kMer2Red.py \
> > -file /home/cloudera/kMer2/kMer2Red.py
> >
> > Both mapper and reducer scripts just contain one line of code:
> >
> > for line in sys.stdin:
> >     line = line.strip()
> >     print "%s" % (line)
> >
> > Following is the o/p:
> >
> > [cloudera@localhost kMer2]$ hadoop dfs -cat
> /usr/pkansal/kMer2/op/part-00000
> >
> > 2 chr1 12 19
> > 2 chr1 15 16
> > 3 chr1 22 30
> > 3 chr1 22 30
> >
> > [cloudera@localhost kMer2]$ hadoop dfs -cat
> /usr/pkansal/kMer2/op/part-00001
> >
> > 1 chr1 2 8
> > 1 chr1 3 7
> > 1 chr1 9 10
> > 2 chr1 11 18
> >
> > [cloudera@localhost kMer2]$ hadoop dfs -cat
> /usr/pkansal/kMer2/op/part-00002
> >
> > 1 chr1 1 10
> > 3 chr1 22 29
> >
> > This is not the o/p which I expected. I expected all records with:
> >
> > partition 1 in one single file eg part-m-00000
> > partition 2 in one single file eg part-m-00001
> > partition 3 in one single file eg part-m-00002
> >
> > Can you please suggest if I am doing it in a right way?
> >
> > --
> > Regards,
> > Piyush Kansal
> >
>
>
>
> --
> Harsh J
> Customer Ops. Engineer
> Cloudera | http://tiny.cloudera.com/about
>



-- 
Regards,
Piyush Kansal

Re: Query regarding Hadoop Partitioning

Posted by Harsh J <ha...@cloudera.com>.
Hi,

You would find it easier to use the Java API's MultipleOutputs (and/or
MultipleOutputFormat, which directly works on a configured key field),
to write each key-partition out in its own file.

On Mon, Feb 20, 2012 at 7:38 AM, Piyush Kansal <pi...@gmail.com> wrote:
> Hi Friends,
>
> I have to sort huge amount of data in minimum possible time probably using
> partitioning. The key is composed of 3 fields(partition, text and number).
> This is how partition is defined:
>
> Partition "1" for range 1-10
> Partition "2" for range 11-20
> Partition "3" for range 21-30
>
> I/P file format: partition[tab]text[tab]range-start[tab]range-end
>
> [cloudera@localhost kMer2]$ cat input1
>
> 1 chr1 1 10
> 1 chr1 2 8
> 2 chr1 11 18
>
> [cloudera@localhost kMer2]$ cat input2
>
> 1 chr1 3 7
> 2 chr1 12 19
>
> [cloudera@localhost kMer2]$ cat input3
>
> 3 chr1 22 30
>
> [cloudera@localhost kMer2]$ cat input4
>
> 3 chr1 22 30
> 1 chr1 9 10
> 2 chr1 15 16
>
> Then I ran following command:
>
> hadoop jar
> /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar \
> -D stream.map.output.field.separator='\t' \
> -D stream.num.map.output.key.fields=3 \
> -D map.output.key.field.separator='\t' \
> -D mapred.text.key.partitioner.options=-k1 \
> -D mapred.reduce.tasks=3 \
> -input /usr/pkansal/kMer2/ip \
> -output /usr/pkansal/kMer2/op \
> -mapper /home/cloudera/kMer2/kMer2Map.py \
> -file /home/cloudera/kMer2/kMer2Map.py \
> -reducer /home/cloudera/kMer2/kMer2Red.py \
> -file /home/cloudera/kMer2/kMer2Red.py
>
> Both mapper and reducer scripts just contain one line of code:
>
> for line in sys.stdin:
>     line = line.strip()
>     print "%s" % (line)
>
> Following is the o/p:
>
> [cloudera@localhost kMer2]$ hadoop dfs -cat /usr/pkansal/kMer2/op/part-00000
>
> 2 chr1 12 19
> 2 chr1 15 16
> 3 chr1 22 30
> 3 chr1 22 30
>
> [cloudera@localhost kMer2]$ hadoop dfs -cat /usr/pkansal/kMer2/op/part-00001
>
> 1 chr1 2 8
> 1 chr1 3 7
> 1 chr1 9 10
> 2 chr1 11 18
>
> [cloudera@localhost kMer2]$ hadoop dfs -cat /usr/pkansal/kMer2/op/part-00002
>
> 1 chr1 1 10
> 3 chr1 22 29
>
> This is not the o/p which I expected. I expected all records with:
>
> partition 1 in one single file eg part-m-00000
> partition 2 in one single file eg part-m-00001
> partition 3 in one single file eg part-m-00002
>
> Can you please suggest if I am doing it in a right way?
>
> --
> Regards,
> Piyush Kansal
>



-- 
Harsh J
Customer Ops. Engineer
Cloudera | http://tiny.cloudera.com/about

Re: Query regarding Hadoop Partitioning

Posted by Piyush Kansal <pi...@gmail.com>.
Thanks Utkarsh.

But I cant find such function in Hadoop. Moreover, is there any reason why
default partitioning wont work? I mean if it does not work, then why its
even there. May be I am missing something?

On Sun, Feb 19, 2012 at 10:40 PM, Utkarsh Gupta
<Ut...@infosys.com>wrote:

> Hi Piyush,****
>
> ** **
>
> I think you need to override the inbuilt partitioning function.****
>
> You can use function like (first field of key)%3****
>
> This will send all the keys with same first field to a separate reduce
> process****
>
> Please correct me if I am wrong.****
>
> Thanks ****
>
> Utkarsh****
>
> *From:* Piyush Kansal [mailto:piyush.kansal@gmail.com]
> *Sent:* Monday, February 20, 2012 7:39 AM
> *To:* mapreduce-user@hadoop.apache.org
> *Subject:* Query regarding Hadoop Partitioning****
>
> ** **
>
> Hi Friends,****
>
> I have to sort huge amount of data in minimum possible time probably using
> partitioning. The key is composed of 3 fields(partition, text and number).
> This is how partition is defined:****
>
>    - Partition "1" for range 1-10****
>    - Partition "2" for range 11-20****
>    - Partition "3" for range 21-30****
>
> *I/P file format*: partition[tab]text[tab]range-start[tab]range-end****
>
> [cloudera@localhost kMer2]$ cat input1****
>
>    - 1 chr1 1 10****
>    - 1 chr1 2 8****
>    - 2 chr1 11 18****
>
> [cloudera@localhost kMer2]$ cat input2****
>
>    - 1 chr1 3 7****
>    - 2 chr1 12 19****
>
> [cloudera@localhost kMer2]$ cat input3****
>
>    - 3 chr1 22 30****
>
> [cloudera@localhost kMer2]$ cat input4****
>
>    - 3 chr1 22 30****
>    - 1 chr1 9 10****
>    - 2 chr1 15 16****
>
> Then I ran following command:****
>
> hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar \****
>
> -D stream.map.output.field.separator='\t' \****
>
> -D stream.num.map.output.key.fields=3 \****
>
> -D map.output.key.field.separator='\t' \****
>
> -D mapred.text.key.partitioner.options=-k1 \****
>
> -D mapred.reduce.tasks=3 \****
>
> -input /usr/pkansal/kMer2/ip \****
>
> -output /usr/pkansal/kMer2/op \****
>
> -mapper /home/cloudera/kMer2/kMer2Map.py \****
>
> -file /home/cloudera/kMer2/kMer2Map.py \****
>
> -reducer /home/cloudera/kMer2/kMer2Red.py \****
>
> -file /home/cloudera/kMer2/kMer2Red.py****
>
> Both mapper and reducer scripts just contain one line of code:****
>
> for line in sys.stdin:****
>
>     line = line.strip()****
>
>     print "%s" % (line)****
>
> Following is the o/p:****
>
> [cloudera@localhost kMer2]$ hadoop dfs -cat
> /usr/pkansal/kMer2/op/part-00000****
>
>    - 2 chr1 12 19****
>    - 2 chr1 15 16****
>    - 3 chr1 22 30****
>    - 3 chr1 22 30****
>
> [cloudera@localhost kMer2]$ hadoop dfs -cat
> /usr/pkansal/kMer2/op/part-00001****
>
>    - 1 chr1 2 8****
>    - 1 chr1 3 7****
>    - 1 chr1 9 10****
>    - 2 chr1 11 18****
>
> [cloudera@localhost kMer2]$ hadoop dfs -cat
> /usr/pkansal/kMer2/op/part-00002****
>
>    - 1 chr1 1 10****
>    - 3 chr1 22 29****
>
> This is not the o/p which I expected. I expected all records with:****
>
>    - partition 1 in one single file eg part-m-00000****
>    - partition 2 in one single file eg part-m-00001****
>    - partition 3 in one single file eg part-m-00002****
>
> Can you please suggest if I am doing it in a right way?****
>
> --
> Regards,
> Piyush Kansal****
>
> **************** CAUTION - Disclaimer *****************
> This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely
> for the use of the addressee(s). If you are not the intended recipient, please
> notify the sender by e-mail and delete the original message. Further, you are not
> to copy, disclose, or distribute this e-mail or its contents to any other person and
> any such actions are unlawful. This e-mail may contain viruses. Infosys has taken
> every reasonable precaution to minimize this risk, but is not liable for any damage
> you may sustain as a result of any virus in this e-mail. You should carry out your
> own virus checks before opening the e-mail or attachment. Infosys reserves the
> right to monitor and review the content of all messages sent to or from this e-mail
> address. Messages sent to or from this e-mail address may be stored on the
> Infosys e-mail system.
> ***INFOSYS******** End of Disclaimer ********INFOSYS***
>
>


-- 
Regards,
Piyush Kansal

RE: Query regarding Hadoop Partitioning

Posted by Utkarsh Gupta <Ut...@infosys.com>.
Hi Piyush,

I think you need to override the inbuilt partitioning function.
You can use function like (first field of key)%3
This will send all the keys with same first field to a separate reduce process
Please correct me if I am wrong.
Thanks
Utkarsh
From: Piyush Kansal [mailto:piyush.kansal@gmail.com]
Sent: Monday, February 20, 2012 7:39 AM
To: mapreduce-user@hadoop.apache.org
Subject: Query regarding Hadoop Partitioning


Hi Friends,

I have to sort huge amount of data in minimum possible time probably using partitioning. The key is composed of 3 fields(partition, text and number). This is how partition is defined:

 *   Partition "1" for range 1-10
 *   Partition "2" for range 11-20
 *   Partition "3" for range 21-30

I/P file format: partition[tab]text[tab]range-start[tab]range-end

[cloudera@localhost kMer2]$ cat input1

 *   1 chr1 1 10
 *   1 chr1 2 8
 *   2 chr1 11 18

[cloudera@localhost kMer2]$ cat input2

 *   1 chr1 3 7
 *   2 chr1 12 19

[cloudera@localhost kMer2]$ cat input3

 *   3 chr1 22 30

[cloudera@localhost kMer2]$ cat input4

 *   3 chr1 22 30
 *   1 chr1 9 10
 *   2 chr1 15 16

Then I ran following command:

hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar \

-D stream.map.output.field.separator='\t' \

-D stream.num.map.output.key.fields=3 \

-D map.output.key.field.separator='\t' \

-D mapred.text.key.partitioner.options=-k1 \

-D mapred.reduce.tasks=3 \

-input /usr/pkansal/kMer2/ip \

-output /usr/pkansal/kMer2/op \

-mapper /home/cloudera/kMer2/kMer2Map.py \

-file /home/cloudera/kMer2/kMer2Map.py \

-reducer /home/cloudera/kMer2/kMer2Red.py \

-file /home/cloudera/kMer2/kMer2Red.py

Both mapper and reducer scripts just contain one line of code:

for line in sys.stdin:

    line = line.strip()

    print "%s" % (line)

Following is the o/p:

[cloudera@localhost kMer2]$ hadoop dfs -cat /usr/pkansal/kMer2/op/part-00000

 *   2 chr1 12 19
 *   2 chr1 15 16
 *   3 chr1 22 30
 *   3 chr1 22 30

[cloudera@localhost kMer2]$ hadoop dfs -cat /usr/pkansal/kMer2/op/part-00001

 *   1 chr1 2 8
 *   1 chr1 3 7
 *   1 chr1 9 10
 *   2 chr1 11 18

[cloudera@localhost kMer2]$ hadoop dfs -cat /usr/pkansal/kMer2/op/part-00002

 *   1 chr1 1 10
 *   3 chr1 22 29

This is not the o/p which I expected. I expected all records with:

 *   partition 1 in one single file eg part-m-00000
 *   partition 2 in one single file eg part-m-00001
 *   partition 3 in one single file eg part-m-00002

Can you please suggest if I am doing it in a right way?
--
Regards,
Piyush Kansal

**************** CAUTION - Disclaimer *****************
This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely 
for the use of the addressee(s). If you are not the intended recipient, please 
notify the sender by e-mail and delete the original message. Further, you are not 
to copy, disclose, or distribute this e-mail or its contents to any other person and 
any such actions are unlawful. This e-mail may contain viruses. Infosys has taken 
every reasonable precaution to minimize this risk, but is not liable for any damage 
you may sustain as a result of any virus in this e-mail. You should carry out your 
own virus checks before opening the e-mail or attachment. Infosys reserves the 
right to monitor and review the content of all messages sent to or from this e-mail 
address. Messages sent to or from this e-mail address may be stored on the 
Infosys e-mail system.
***INFOSYS******** End of Disclaimer ********INFOSYS***