You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hadoop.apache.org by java8964 java8964 <ja...@hotmail.com> on 2013/10/29 01:28:02 UTC

Why the reducer's input group count is higher than my GroupComparator implementation

Hi, I have a strange question related to my secondary sort implementation in the MR job.Currently I need to support 2nd sort in one of my MR job. I implemented my custom WritableComparable like following:
public class MyPartitionKey implements WritableComparable<MyPartitionKey> {    String type;    long id1;    String id2;    String id3;    String id4;    long timestamp1;    long timestamp2}
Then I implemented following methods for this class:
public int compareTo(); // sort the data based on all attributes listed above, sorted the last 2 timestamps descendingpublic int hashCode(); // generate the hashcode using all attributes abovepublic boolean equals(); // using all the attributes for equals checkpublic void write(DataOutput out) // serialize all the attributes listed abovepublic void readFields(DataInput in) // deserialize all the attributes listed above
For partition and grouping of my keys, I want the following logic:Based on the type, the data could partition either by year or by day for timestamp1.
For sorting order, I want the data sort by (type, id1, id2, id3, id4), then reverse sorting by (timestamp1, timestamp2).
I implemented my KeyComparator using my sorting order logic listed above, and my Partitioner and GroupComparator based on my logic listed above.
Here is the pseudo code of the Partitioner and GroupComparator:
public class MyPartitioner implements Partitioner {    @Override    public int getPartition(MyPartitionKey key, Value value, int numPartitions) {        int hashCode = key.getActivityType().name().hashCode();        StringBuilder sb = new StringBuilder();        for (String subPartitionValue : key.getPartitionValue()) {            sb.append(subPartitionValue);        }        return Math.abs(hashCode * 127 + sb.toString().hashCode()) % numPartitions;    }
    @Override    public void configure(JobConf job) {    }}
// The key getPartitionValue method will return array of string of either YYYY or {YYYY, MM, DD} of the timestamp1.
For GroupComparator:
    public static class MyGroupComparator extends WritableComparator {        protected MyGroupComparator() {            super(MyPartitionKey.class, true);        }
        @Override        public int compare(WritableComparable w1, WritableComparable w2) {            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;            int cmp = key1.type.compareTo(key2.type);            // different type, send to different group            if (cmp != 0)                return cmp;
            // for the same type, should have the same partition value array length            String[] partitionValue1 = key1.getPartitionValue();            String[] partitionValue2 = key2.getPartitionValue();            assert partitionValue1.length == partitionValue2.length;            StringBuilder sb1 = new StringBuilder();            StringBuilder sb2 = new StringBuilder();            for (String subValue : partitionValue1) {                sb1.append(subValue);            }            for (String subValue : partitionValue2) {                sb2.append(subValue);            }            return sb1.toString().compareTo(sb2.toString());        }
Now, here is the strange problem I don't understand. I tested with my MR job. I know in the test data, I have 7 types data, 3 of them partitioned yearly, 4 of them partition daily. In the test data, for the 3 types partitioned daily, there are 2 days data of each type. So I expected the Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In fact, if I don't use this custom MyPartitionKey, just use  Text as the key type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily dataset, there are 11 input groups for the reducer. But I have to support secondary sort. To my surprise, runtime MR job generates 51792 input groups for the reducer. This doesn't make sense. 
If I changed MyGroupComparator compare() method, to only compare the type, like following:      @Override        public int compare(WritableComparable w1, WritableComparable w2) {            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;            return key1.type.compareTo(key2.type);       }The MR job generates 7 input group for the reducer, which is what I expects. But when I start to add the comparing of the YYYY or MM or DD data parsed out from the timestamp1, the input group count became very large.
What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group large, because in the test data, there are a lot of combination of unique (id1,id2,id3,id4). But they are NOT part of my GroupComparator implementation. Why in this case, the input group count for the reducer is so high? And in this case, the MR job won't do what I want, as same group of data NOT being sent to the same reducer. Here are the summary of my questions:
1) My understanding is that GroupComparator is the only class to control the input groups of the reducer, is that correct?2) If so, in my case above, I know MyGroupComparator will return 11 unique values from my test data. Why there are 51792 input groups generated? This big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not used in MyGroupComparator, why they affect the reducer input group count?3) If I only use type in my GroupComparator, I got correct 7 input groups for the reducer. So in this case, it correctly ignored all other data contains in the MyPartitionKey class, why? Is the order of the attributes make any difference? I don't think so, but I cannot explain the above result.
If you have any idea, or my implementation has any problem, please let me know.
Thanks
Yong 		 	   		  

RE: Why the reducer's input group count is higher than my GroupComparator implementation

Posted by java8964 java8964 <ja...@hotmail.com>.
OK. 
Finally I found out the reason for this problem, after reading the Hadoop ReducerTask source code.
Here is the mistake I made, and something new I learned about secondary sorting.
In my originally implementation, I partitioned the data by YYYYMMDD, but I didn't add this information in the sort order. That said, if you want to group and partition the data based on field1, field2, and sort the data on field 3, field, you have to sort the data based on field1, field2 first, then sort remaining based on what order you want. My mistake is that I didn't sort on field 2, even I do group and partition also on field 2.
New thing I learned after reading source code is that the NOT just one group data could send to one reducer, but multi groups of data. I think this makes sense, as you could have a lot of input groups data, but only small number of reducers to handle them. But the trick is that it is both Grouping Comparator and the sort order together decide if a new input group of reducer created or not.
For example, in my data, if I have:
type1, id1, YYYYMMDD1type1, id2, YYYYMMDD2type1, id3, YYYYMMDD1type1, id4, YYYYMMDD2
These will consider as 4 input group to the reducer, instead of 2 (Even the data only have (type1, YYYYMMDD1) and (type1, YYYYMMDD2) 2 unique values), but since YYYYMMDD is not part of the sorting order, so the data could arrive in the above order, and the GroupCompare will get non-zero result for each compare, then leads to 4 input groups generated.
Lesson learned, and be able to read the source code is a huge benefit.
Yong
From: java8964@hotmail.com
To: user@hadoop.apache.org
Subject: RE: Why the reducer's input group count is higher than my GroupComparator implementation
Date: Tue, 29 Oct 2013 09:46:34 -0400




Yes. 
The Partitioner uses the same hashcode() on the String generated from the (type + YYYY/MM/DD).
I add the log in the GroupComparator, and observed there are only 11 unique values being compared in the GroupComparator, but don't know why the reducers input group number is much higher than 11.

Date: Tue, 29 Oct 2013 08:57:32 +0100
Subject: Re: Why the reducer's input group count is higher than my GroupComparator implementation
From: drdwitte@gmail.com
To: user@hadoop.apache.org

Did you overwrite the partitioner as well?


2013/10/29 java8964 java8964 <ja...@hotmail.com>




Hi, I have a strange question related to my secondary sort implementation in the MR job.Currently I need to support 2nd sort in one of my MR job. I implemented my custom WritableComparable like following:

public class MyPartitionKey implements WritableComparable<MyPartitionKey> {    String type;    long id1;    String id2;    String id3;    String id4;
    long timestamp1;    long timestamp2}
Then I implemented following methods for this class:
public int compareTo(); // sort the data based on all attributes listed above, sorted the last 2 timestamps descending
public int hashCode(); // generate the hashcode using all attributes abovepublic boolean equals(); // using all the attributes for equals checkpublic void write(DataOutput out) // serialize all the attributes listed above
public void readFields(DataInput in) // deserialize all the attributes listed above
For partition and grouping of my keys, I want the following logic:Based on the type, the data could partition either by year or by day for timestamp1.

For sorting order, I want the data sort by (type, id1, id2, id3, id4), then reverse sorting by (timestamp1, timestamp2).
I implemented my KeyComparator using my sorting order logic listed above, and my Partitioner and GroupComparator based on my logic listed above.

Here is the pseudo code of the Partitioner and GroupComparator:
public class MyPartitioner implements Partitioner {    @Override    public int getPartition(MyPartitionKey key, Value value, int numPartitions) {
        int hashCode = key.getActivityType().name().hashCode();        StringBuilder sb = new StringBuilder();        for (String subPartitionValue : key.getPartitionValue()) {            sb.append(subPartitionValue);
        }        return Math.abs(hashCode * 127 + sb.toString().hashCode()) % numPartitions;    }
    @Override    public void configure(JobConf job) {
    }}
// The key getPartitionValue method will return array of string of either YYYY or {YYYY, MM, DD} of the timestamp1.
For GroupComparator:

    public static class MyGroupComparator extends WritableComparator {        protected MyGroupComparator() {            super(MyPartitionKey.class, true);
        }
        @Override        public int compare(WritableComparable w1, WritableComparable w2) {            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;
            int cmp = key1.type.compareTo(key2.type);            // different type, send to different group            if (cmp != 0)                return cmp;

            // for the same type, should have the same partition value array length            String[] partitionValue1 = key1.getPartitionValue();
            String[] partitionValue2 = key2.getPartitionValue();            assert partitionValue1.length == partitionValue2.length;            StringBuilder sb1 = new StringBuilder();            StringBuilder sb2 = new StringBuilder();
            for (String subValue : partitionValue1) {                sb1.append(subValue);            }            for (String subValue : partitionValue2) {                sb2.append(subValue);
            }            return sb1.toString().compareTo(sb2.toString());        }
Now, here is the strange problem I don't understand. I tested with my MR job. I know in the test data, I have 7 types data, 3 of them partitioned yearly, 4 of them partition daily. In the test data, for the 3 types partitioned daily, there are 2 days data of each type. So I expected the Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In fact, if I don't use this custom MyPartitionKey, just use  Text as the key type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily dataset, there are 11 input groups for the reducer. But I have to support secondary sort. To my surprise, runtime MR job generates 51792 input groups for the reducer. This doesn't make sense. 

If I changed MyGroupComparator compare() method, to only compare the type, like following:      @Override        public int compare(WritableComparable w1, WritableComparable w2) {
            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;            return key1.type.compareTo(key2.type);       }
The MR job generates 7 input group for the reducer, which is what I expects. But when I start to add the comparing of the YYYY or MM or DD data parsed out from the timestamp1, the input group count became very large.

What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group large, because in the test data, there are a lot of combination of unique (id1,id2,id3,id4). But they are NOT part of my GroupComparator implementation. Why in this case, the input group count for the reducer is so high? And in this case, the MR job won't do what I want, as same group of data NOT being sent to the same reducer. Here are the summary of my questions:

1) My understanding is that GroupComparator is the only class to control the input groups of the reducer, is that correct?2) If so, in my case above, I know MyGroupComparator will return 11 unique values from my test data. Why there are 51792 input groups generated? This big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not used in MyGroupComparator, why they affect the reducer input group count?
3) If I only use type in my GroupComparator, I got correct 7 input groups for the reducer. So in this case, it correctly ignored all other data contains in the MyPartitionKey class, why? Is the order of the attributes make any difference? I don't think so, but I cannot explain the above result.

If you have any idea, or my implementation has any problem, please let me know.
Thanks
Yong 		 	   		  

 		 	   		   		 	   		  

RE: Why the reducer's input group count is higher than my GroupComparator implementation

Posted by java8964 java8964 <ja...@hotmail.com>.
OK. 
Finally I found out the reason for this problem, after reading the Hadoop ReducerTask source code.
Here is the mistake I made, and something new I learned about secondary sorting.
In my originally implementation, I partitioned the data by YYYYMMDD, but I didn't add this information in the sort order. That said, if you want to group and partition the data based on field1, field2, and sort the data on field 3, field, you have to sort the data based on field1, field2 first, then sort remaining based on what order you want. My mistake is that I didn't sort on field 2, even I do group and partition also on field 2.
New thing I learned after reading source code is that the NOT just one group data could send to one reducer, but multi groups of data. I think this makes sense, as you could have a lot of input groups data, but only small number of reducers to handle them. But the trick is that it is both Grouping Comparator and the sort order together decide if a new input group of reducer created or not.
For example, in my data, if I have:
type1, id1, YYYYMMDD1type1, id2, YYYYMMDD2type1, id3, YYYYMMDD1type1, id4, YYYYMMDD2
These will consider as 4 input group to the reducer, instead of 2 (Even the data only have (type1, YYYYMMDD1) and (type1, YYYYMMDD2) 2 unique values), but since YYYYMMDD is not part of the sorting order, so the data could arrive in the above order, and the GroupCompare will get non-zero result for each compare, then leads to 4 input groups generated.
Lesson learned, and be able to read the source code is a huge benefit.
Yong
From: java8964@hotmail.com
To: user@hadoop.apache.org
Subject: RE: Why the reducer's input group count is higher than my GroupComparator implementation
Date: Tue, 29 Oct 2013 09:46:34 -0400




Yes. 
The Partitioner uses the same hashcode() on the String generated from the (type + YYYY/MM/DD).
I add the log in the GroupComparator, and observed there are only 11 unique values being compared in the GroupComparator, but don't know why the reducers input group number is much higher than 11.

Date: Tue, 29 Oct 2013 08:57:32 +0100
Subject: Re: Why the reducer's input group count is higher than my GroupComparator implementation
From: drdwitte@gmail.com
To: user@hadoop.apache.org

Did you overwrite the partitioner as well?


2013/10/29 java8964 java8964 <ja...@hotmail.com>




Hi, I have a strange question related to my secondary sort implementation in the MR job.Currently I need to support 2nd sort in one of my MR job. I implemented my custom WritableComparable like following:

public class MyPartitionKey implements WritableComparable<MyPartitionKey> {    String type;    long id1;    String id2;    String id3;    String id4;
    long timestamp1;    long timestamp2}
Then I implemented following methods for this class:
public int compareTo(); // sort the data based on all attributes listed above, sorted the last 2 timestamps descending
public int hashCode(); // generate the hashcode using all attributes abovepublic boolean equals(); // using all the attributes for equals checkpublic void write(DataOutput out) // serialize all the attributes listed above
public void readFields(DataInput in) // deserialize all the attributes listed above
For partition and grouping of my keys, I want the following logic:Based on the type, the data could partition either by year or by day for timestamp1.

For sorting order, I want the data sort by (type, id1, id2, id3, id4), then reverse sorting by (timestamp1, timestamp2).
I implemented my KeyComparator using my sorting order logic listed above, and my Partitioner and GroupComparator based on my logic listed above.

Here is the pseudo code of the Partitioner and GroupComparator:
public class MyPartitioner implements Partitioner {    @Override    public int getPartition(MyPartitionKey key, Value value, int numPartitions) {
        int hashCode = key.getActivityType().name().hashCode();        StringBuilder sb = new StringBuilder();        for (String subPartitionValue : key.getPartitionValue()) {            sb.append(subPartitionValue);
        }        return Math.abs(hashCode * 127 + sb.toString().hashCode()) % numPartitions;    }
    @Override    public void configure(JobConf job) {
    }}
// The key getPartitionValue method will return array of string of either YYYY or {YYYY, MM, DD} of the timestamp1.
For GroupComparator:

    public static class MyGroupComparator extends WritableComparator {        protected MyGroupComparator() {            super(MyPartitionKey.class, true);
        }
        @Override        public int compare(WritableComparable w1, WritableComparable w2) {            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;
            int cmp = key1.type.compareTo(key2.type);            // different type, send to different group            if (cmp != 0)                return cmp;

            // for the same type, should have the same partition value array length            String[] partitionValue1 = key1.getPartitionValue();
            String[] partitionValue2 = key2.getPartitionValue();            assert partitionValue1.length == partitionValue2.length;            StringBuilder sb1 = new StringBuilder();            StringBuilder sb2 = new StringBuilder();
            for (String subValue : partitionValue1) {                sb1.append(subValue);            }            for (String subValue : partitionValue2) {                sb2.append(subValue);
            }            return sb1.toString().compareTo(sb2.toString());        }
Now, here is the strange problem I don't understand. I tested with my MR job. I know in the test data, I have 7 types data, 3 of them partitioned yearly, 4 of them partition daily. In the test data, for the 3 types partitioned daily, there are 2 days data of each type. So I expected the Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In fact, if I don't use this custom MyPartitionKey, just use  Text as the key type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily dataset, there are 11 input groups for the reducer. But I have to support secondary sort. To my surprise, runtime MR job generates 51792 input groups for the reducer. This doesn't make sense. 

If I changed MyGroupComparator compare() method, to only compare the type, like following:      @Override        public int compare(WritableComparable w1, WritableComparable w2) {
            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;            return key1.type.compareTo(key2.type);       }
The MR job generates 7 input group for the reducer, which is what I expects. But when I start to add the comparing of the YYYY or MM or DD data parsed out from the timestamp1, the input group count became very large.

What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group large, because in the test data, there are a lot of combination of unique (id1,id2,id3,id4). But they are NOT part of my GroupComparator implementation. Why in this case, the input group count for the reducer is so high? And in this case, the MR job won't do what I want, as same group of data NOT being sent to the same reducer. Here are the summary of my questions:

1) My understanding is that GroupComparator is the only class to control the input groups of the reducer, is that correct?2) If so, in my case above, I know MyGroupComparator will return 11 unique values from my test data. Why there are 51792 input groups generated? This big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not used in MyGroupComparator, why they affect the reducer input group count?
3) If I only use type in my GroupComparator, I got correct 7 input groups for the reducer. So in this case, it correctly ignored all other data contains in the MyPartitionKey class, why? Is the order of the attributes make any difference? I don't think so, but I cannot explain the above result.

If you have any idea, or my implementation has any problem, please let me know.
Thanks
Yong 		 	   		  

 		 	   		   		 	   		  

RE: Why the reducer's input group count is higher than my GroupComparator implementation

Posted by java8964 java8964 <ja...@hotmail.com>.
OK. 
Finally I found out the reason for this problem, after reading the Hadoop ReducerTask source code.
Here is the mistake I made, and something new I learned about secondary sorting.
In my originally implementation, I partitioned the data by YYYYMMDD, but I didn't add this information in the sort order. That said, if you want to group and partition the data based on field1, field2, and sort the data on field 3, field, you have to sort the data based on field1, field2 first, then sort remaining based on what order you want. My mistake is that I didn't sort on field 2, even I do group and partition also on field 2.
New thing I learned after reading source code is that the NOT just one group data could send to one reducer, but multi groups of data. I think this makes sense, as you could have a lot of input groups data, but only small number of reducers to handle them. But the trick is that it is both Grouping Comparator and the sort order together decide if a new input group of reducer created or not.
For example, in my data, if I have:
type1, id1, YYYYMMDD1type1, id2, YYYYMMDD2type1, id3, YYYYMMDD1type1, id4, YYYYMMDD2
These will consider as 4 input group to the reducer, instead of 2 (Even the data only have (type1, YYYYMMDD1) and (type1, YYYYMMDD2) 2 unique values), but since YYYYMMDD is not part of the sorting order, so the data could arrive in the above order, and the GroupCompare will get non-zero result for each compare, then leads to 4 input groups generated.
Lesson learned, and be able to read the source code is a huge benefit.
Yong
From: java8964@hotmail.com
To: user@hadoop.apache.org
Subject: RE: Why the reducer's input group count is higher than my GroupComparator implementation
Date: Tue, 29 Oct 2013 09:46:34 -0400




Yes. 
The Partitioner uses the same hashcode() on the String generated from the (type + YYYY/MM/DD).
I add the log in the GroupComparator, and observed there are only 11 unique values being compared in the GroupComparator, but don't know why the reducers input group number is much higher than 11.

Date: Tue, 29 Oct 2013 08:57:32 +0100
Subject: Re: Why the reducer's input group count is higher than my GroupComparator implementation
From: drdwitte@gmail.com
To: user@hadoop.apache.org

Did you overwrite the partitioner as well?


2013/10/29 java8964 java8964 <ja...@hotmail.com>




Hi, I have a strange question related to my secondary sort implementation in the MR job.Currently I need to support 2nd sort in one of my MR job. I implemented my custom WritableComparable like following:

public class MyPartitionKey implements WritableComparable<MyPartitionKey> {    String type;    long id1;    String id2;    String id3;    String id4;
    long timestamp1;    long timestamp2}
Then I implemented following methods for this class:
public int compareTo(); // sort the data based on all attributes listed above, sorted the last 2 timestamps descending
public int hashCode(); // generate the hashcode using all attributes abovepublic boolean equals(); // using all the attributes for equals checkpublic void write(DataOutput out) // serialize all the attributes listed above
public void readFields(DataInput in) // deserialize all the attributes listed above
For partition and grouping of my keys, I want the following logic:Based on the type, the data could partition either by year or by day for timestamp1.

For sorting order, I want the data sort by (type, id1, id2, id3, id4), then reverse sorting by (timestamp1, timestamp2).
I implemented my KeyComparator using my sorting order logic listed above, and my Partitioner and GroupComparator based on my logic listed above.

Here is the pseudo code of the Partitioner and GroupComparator:
public class MyPartitioner implements Partitioner {    @Override    public int getPartition(MyPartitionKey key, Value value, int numPartitions) {
        int hashCode = key.getActivityType().name().hashCode();        StringBuilder sb = new StringBuilder();        for (String subPartitionValue : key.getPartitionValue()) {            sb.append(subPartitionValue);
        }        return Math.abs(hashCode * 127 + sb.toString().hashCode()) % numPartitions;    }
    @Override    public void configure(JobConf job) {
    }}
// The key getPartitionValue method will return array of string of either YYYY or {YYYY, MM, DD} of the timestamp1.
For GroupComparator:

    public static class MyGroupComparator extends WritableComparator {        protected MyGroupComparator() {            super(MyPartitionKey.class, true);
        }
        @Override        public int compare(WritableComparable w1, WritableComparable w2) {            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;
            int cmp = key1.type.compareTo(key2.type);            // different type, send to different group            if (cmp != 0)                return cmp;

            // for the same type, should have the same partition value array length            String[] partitionValue1 = key1.getPartitionValue();
            String[] partitionValue2 = key2.getPartitionValue();            assert partitionValue1.length == partitionValue2.length;            StringBuilder sb1 = new StringBuilder();            StringBuilder sb2 = new StringBuilder();
            for (String subValue : partitionValue1) {                sb1.append(subValue);            }            for (String subValue : partitionValue2) {                sb2.append(subValue);
            }            return sb1.toString().compareTo(sb2.toString());        }
Now, here is the strange problem I don't understand. I tested with my MR job. I know in the test data, I have 7 types data, 3 of them partitioned yearly, 4 of them partition daily. In the test data, for the 3 types partitioned daily, there are 2 days data of each type. So I expected the Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In fact, if I don't use this custom MyPartitionKey, just use  Text as the key type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily dataset, there are 11 input groups for the reducer. But I have to support secondary sort. To my surprise, runtime MR job generates 51792 input groups for the reducer. This doesn't make sense. 

If I changed MyGroupComparator compare() method, to only compare the type, like following:      @Override        public int compare(WritableComparable w1, WritableComparable w2) {
            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;            return key1.type.compareTo(key2.type);       }
The MR job generates 7 input group for the reducer, which is what I expects. But when I start to add the comparing of the YYYY or MM or DD data parsed out from the timestamp1, the input group count became very large.

What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group large, because in the test data, there are a lot of combination of unique (id1,id2,id3,id4). But they are NOT part of my GroupComparator implementation. Why in this case, the input group count for the reducer is so high? And in this case, the MR job won't do what I want, as same group of data NOT being sent to the same reducer. Here are the summary of my questions:

1) My understanding is that GroupComparator is the only class to control the input groups of the reducer, is that correct?2) If so, in my case above, I know MyGroupComparator will return 11 unique values from my test data. Why there are 51792 input groups generated? This big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not used in MyGroupComparator, why they affect the reducer input group count?
3) If I only use type in my GroupComparator, I got correct 7 input groups for the reducer. So in this case, it correctly ignored all other data contains in the MyPartitionKey class, why? Is the order of the attributes make any difference? I don't think so, but I cannot explain the above result.

If you have any idea, or my implementation has any problem, please let me know.
Thanks
Yong 		 	   		  

 		 	   		   		 	   		  

RE: Why the reducer's input group count is higher than my GroupComparator implementation

Posted by java8964 java8964 <ja...@hotmail.com>.
OK. 
Finally I found out the reason for this problem, after reading the Hadoop ReducerTask source code.
Here is the mistake I made, and something new I learned about secondary sorting.
In my originally implementation, I partitioned the data by YYYYMMDD, but I didn't add this information in the sort order. That said, if you want to group and partition the data based on field1, field2, and sort the data on field 3, field, you have to sort the data based on field1, field2 first, then sort remaining based on what order you want. My mistake is that I didn't sort on field 2, even I do group and partition also on field 2.
New thing I learned after reading source code is that the NOT just one group data could send to one reducer, but multi groups of data. I think this makes sense, as you could have a lot of input groups data, but only small number of reducers to handle them. But the trick is that it is both Grouping Comparator and the sort order together decide if a new input group of reducer created or not.
For example, in my data, if I have:
type1, id1, YYYYMMDD1type1, id2, YYYYMMDD2type1, id3, YYYYMMDD1type1, id4, YYYYMMDD2
These will consider as 4 input group to the reducer, instead of 2 (Even the data only have (type1, YYYYMMDD1) and (type1, YYYYMMDD2) 2 unique values), but since YYYYMMDD is not part of the sorting order, so the data could arrive in the above order, and the GroupCompare will get non-zero result for each compare, then leads to 4 input groups generated.
Lesson learned, and be able to read the source code is a huge benefit.
Yong
From: java8964@hotmail.com
To: user@hadoop.apache.org
Subject: RE: Why the reducer's input group count is higher than my GroupComparator implementation
Date: Tue, 29 Oct 2013 09:46:34 -0400




Yes. 
The Partitioner uses the same hashcode() on the String generated from the (type + YYYY/MM/DD).
I add the log in the GroupComparator, and observed there are only 11 unique values being compared in the GroupComparator, but don't know why the reducers input group number is much higher than 11.

Date: Tue, 29 Oct 2013 08:57:32 +0100
Subject: Re: Why the reducer's input group count is higher than my GroupComparator implementation
From: drdwitte@gmail.com
To: user@hadoop.apache.org

Did you overwrite the partitioner as well?


2013/10/29 java8964 java8964 <ja...@hotmail.com>




Hi, I have a strange question related to my secondary sort implementation in the MR job.Currently I need to support 2nd sort in one of my MR job. I implemented my custom WritableComparable like following:

public class MyPartitionKey implements WritableComparable<MyPartitionKey> {    String type;    long id1;    String id2;    String id3;    String id4;
    long timestamp1;    long timestamp2}
Then I implemented following methods for this class:
public int compareTo(); // sort the data based on all attributes listed above, sorted the last 2 timestamps descending
public int hashCode(); // generate the hashcode using all attributes abovepublic boolean equals(); // using all the attributes for equals checkpublic void write(DataOutput out) // serialize all the attributes listed above
public void readFields(DataInput in) // deserialize all the attributes listed above
For partition and grouping of my keys, I want the following logic:Based on the type, the data could partition either by year or by day for timestamp1.

For sorting order, I want the data sort by (type, id1, id2, id3, id4), then reverse sorting by (timestamp1, timestamp2).
I implemented my KeyComparator using my sorting order logic listed above, and my Partitioner and GroupComparator based on my logic listed above.

Here is the pseudo code of the Partitioner and GroupComparator:
public class MyPartitioner implements Partitioner {    @Override    public int getPartition(MyPartitionKey key, Value value, int numPartitions) {
        int hashCode = key.getActivityType().name().hashCode();        StringBuilder sb = new StringBuilder();        for (String subPartitionValue : key.getPartitionValue()) {            sb.append(subPartitionValue);
        }        return Math.abs(hashCode * 127 + sb.toString().hashCode()) % numPartitions;    }
    @Override    public void configure(JobConf job) {
    }}
// The key getPartitionValue method will return array of string of either YYYY or {YYYY, MM, DD} of the timestamp1.
For GroupComparator:

    public static class MyGroupComparator extends WritableComparator {        protected MyGroupComparator() {            super(MyPartitionKey.class, true);
        }
        @Override        public int compare(WritableComparable w1, WritableComparable w2) {            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;
            int cmp = key1.type.compareTo(key2.type);            // different type, send to different group            if (cmp != 0)                return cmp;

            // for the same type, should have the same partition value array length            String[] partitionValue1 = key1.getPartitionValue();
            String[] partitionValue2 = key2.getPartitionValue();            assert partitionValue1.length == partitionValue2.length;            StringBuilder sb1 = new StringBuilder();            StringBuilder sb2 = new StringBuilder();
            for (String subValue : partitionValue1) {                sb1.append(subValue);            }            for (String subValue : partitionValue2) {                sb2.append(subValue);
            }            return sb1.toString().compareTo(sb2.toString());        }
Now, here is the strange problem I don't understand. I tested with my MR job. I know in the test data, I have 7 types data, 3 of them partitioned yearly, 4 of them partition daily. In the test data, for the 3 types partitioned daily, there are 2 days data of each type. So I expected the Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In fact, if I don't use this custom MyPartitionKey, just use  Text as the key type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily dataset, there are 11 input groups for the reducer. But I have to support secondary sort. To my surprise, runtime MR job generates 51792 input groups for the reducer. This doesn't make sense. 

If I changed MyGroupComparator compare() method, to only compare the type, like following:      @Override        public int compare(WritableComparable w1, WritableComparable w2) {
            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;            return key1.type.compareTo(key2.type);       }
The MR job generates 7 input group for the reducer, which is what I expects. But when I start to add the comparing of the YYYY or MM or DD data parsed out from the timestamp1, the input group count became very large.

What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group large, because in the test data, there are a lot of combination of unique (id1,id2,id3,id4). But they are NOT part of my GroupComparator implementation. Why in this case, the input group count for the reducer is so high? And in this case, the MR job won't do what I want, as same group of data NOT being sent to the same reducer. Here are the summary of my questions:

1) My understanding is that GroupComparator is the only class to control the input groups of the reducer, is that correct?2) If so, in my case above, I know MyGroupComparator will return 11 unique values from my test data. Why there are 51792 input groups generated? This big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not used in MyGroupComparator, why they affect the reducer input group count?
3) If I only use type in my GroupComparator, I got correct 7 input groups for the reducer. So in this case, it correctly ignored all other data contains in the MyPartitionKey class, why? Is the order of the attributes make any difference? I don't think so, but I cannot explain the above result.

If you have any idea, or my implementation has any problem, please let me know.
Thanks
Yong 		 	   		  

 		 	   		   		 	   		  

RE: Why the reducer's input group count is higher than my GroupComparator implementation

Posted by java8964 java8964 <ja...@hotmail.com>.
Yes. 
The Partitioner uses the same hashcode() on the String generated from the (type + YYYY/MM/DD).
I add the log in the GroupComparator, and observed there are only 11 unique values being compared in the GroupComparator, but don't know why the reducers input group number is much higher than 11.

Date: Tue, 29 Oct 2013 08:57:32 +0100
Subject: Re: Why the reducer's input group count is higher than my GroupComparator implementation
From: drdwitte@gmail.com
To: user@hadoop.apache.org

Did you overwrite the partitioner as well?


2013/10/29 java8964 java8964 <ja...@hotmail.com>




Hi, I have a strange question related to my secondary sort implementation in the MR job.Currently I need to support 2nd sort in one of my MR job. I implemented my custom WritableComparable like following:

public class MyPartitionKey implements WritableComparable<MyPartitionKey> {    String type;    long id1;    String id2;    String id3;    String id4;
    long timestamp1;    long timestamp2}
Then I implemented following methods for this class:
public int compareTo(); // sort the data based on all attributes listed above, sorted the last 2 timestamps descending
public int hashCode(); // generate the hashcode using all attributes abovepublic boolean equals(); // using all the attributes for equals checkpublic void write(DataOutput out) // serialize all the attributes listed above
public void readFields(DataInput in) // deserialize all the attributes listed above
For partition and grouping of my keys, I want the following logic:Based on the type, the data could partition either by year or by day for timestamp1.

For sorting order, I want the data sort by (type, id1, id2, id3, id4), then reverse sorting by (timestamp1, timestamp2).
I implemented my KeyComparator using my sorting order logic listed above, and my Partitioner and GroupComparator based on my logic listed above.

Here is the pseudo code of the Partitioner and GroupComparator:
public class MyPartitioner implements Partitioner {    @Override    public int getPartition(MyPartitionKey key, Value value, int numPartitions) {
        int hashCode = key.getActivityType().name().hashCode();        StringBuilder sb = new StringBuilder();        for (String subPartitionValue : key.getPartitionValue()) {            sb.append(subPartitionValue);
        }        return Math.abs(hashCode * 127 + sb.toString().hashCode()) % numPartitions;    }
    @Override    public void configure(JobConf job) {
    }}
// The key getPartitionValue method will return array of string of either YYYY or {YYYY, MM, DD} of the timestamp1.
For GroupComparator:

    public static class MyGroupComparator extends WritableComparator {        protected MyGroupComparator() {            super(MyPartitionKey.class, true);
        }
        @Override        public int compare(WritableComparable w1, WritableComparable w2) {            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;
            int cmp = key1.type.compareTo(key2.type);            // different type, send to different group            if (cmp != 0)                return cmp;

            // for the same type, should have the same partition value array length            String[] partitionValue1 = key1.getPartitionValue();
            String[] partitionValue2 = key2.getPartitionValue();            assert partitionValue1.length == partitionValue2.length;            StringBuilder sb1 = new StringBuilder();            StringBuilder sb2 = new StringBuilder();
            for (String subValue : partitionValue1) {                sb1.append(subValue);            }            for (String subValue : partitionValue2) {                sb2.append(subValue);
            }            return sb1.toString().compareTo(sb2.toString());        }
Now, here is the strange problem I don't understand. I tested with my MR job. I know in the test data, I have 7 types data, 3 of them partitioned yearly, 4 of them partition daily. In the test data, for the 3 types partitioned daily, there are 2 days data of each type. So I expected the Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In fact, if I don't use this custom MyPartitionKey, just use  Text as the key type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily dataset, there are 11 input groups for the reducer. But I have to support secondary sort. To my surprise, runtime MR job generates 51792 input groups for the reducer. This doesn't make sense. 

If I changed MyGroupComparator compare() method, to only compare the type, like following:      @Override        public int compare(WritableComparable w1, WritableComparable w2) {
            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;            return key1.type.compareTo(key2.type);       }
The MR job generates 7 input group for the reducer, which is what I expects. But when I start to add the comparing of the YYYY or MM or DD data parsed out from the timestamp1, the input group count became very large.

What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group large, because in the test data, there are a lot of combination of unique (id1,id2,id3,id4). But they are NOT part of my GroupComparator implementation. Why in this case, the input group count for the reducer is so high? And in this case, the MR job won't do what I want, as same group of data NOT being sent to the same reducer. Here are the summary of my questions:

1) My understanding is that GroupComparator is the only class to control the input groups of the reducer, is that correct?2) If so, in my case above, I know MyGroupComparator will return 11 unique values from my test data. Why there are 51792 input groups generated? This big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not used in MyGroupComparator, why they affect the reducer input group count?
3) If I only use type in my GroupComparator, I got correct 7 input groups for the reducer. So in this case, it correctly ignored all other data contains in the MyPartitionKey class, why? Is the order of the attributes make any difference? I don't think so, but I cannot explain the above result.

If you have any idea, or my implementation has any problem, please let me know.
Thanks
Yong 		 	   		  

 		 	   		  

RE: Why the reducer's input group count is higher than my GroupComparator implementation

Posted by java8964 java8964 <ja...@hotmail.com>.
Yes. 
The Partitioner uses the same hashcode() on the String generated from the (type + YYYY/MM/DD).
I add the log in the GroupComparator, and observed there are only 11 unique values being compared in the GroupComparator, but don't know why the reducers input group number is much higher than 11.

Date: Tue, 29 Oct 2013 08:57:32 +0100
Subject: Re: Why the reducer's input group count is higher than my GroupComparator implementation
From: drdwitte@gmail.com
To: user@hadoop.apache.org

Did you overwrite the partitioner as well?


2013/10/29 java8964 java8964 <ja...@hotmail.com>




Hi, I have a strange question related to my secondary sort implementation in the MR job.Currently I need to support 2nd sort in one of my MR job. I implemented my custom WritableComparable like following:

public class MyPartitionKey implements WritableComparable<MyPartitionKey> {    String type;    long id1;    String id2;    String id3;    String id4;
    long timestamp1;    long timestamp2}
Then I implemented following methods for this class:
public int compareTo(); // sort the data based on all attributes listed above, sorted the last 2 timestamps descending
public int hashCode(); // generate the hashcode using all attributes abovepublic boolean equals(); // using all the attributes for equals checkpublic void write(DataOutput out) // serialize all the attributes listed above
public void readFields(DataInput in) // deserialize all the attributes listed above
For partition and grouping of my keys, I want the following logic:Based on the type, the data could partition either by year or by day for timestamp1.

For sorting order, I want the data sort by (type, id1, id2, id3, id4), then reverse sorting by (timestamp1, timestamp2).
I implemented my KeyComparator using my sorting order logic listed above, and my Partitioner and GroupComparator based on my logic listed above.

Here is the pseudo code of the Partitioner and GroupComparator:
public class MyPartitioner implements Partitioner {    @Override    public int getPartition(MyPartitionKey key, Value value, int numPartitions) {
        int hashCode = key.getActivityType().name().hashCode();        StringBuilder sb = new StringBuilder();        for (String subPartitionValue : key.getPartitionValue()) {            sb.append(subPartitionValue);
        }        return Math.abs(hashCode * 127 + sb.toString().hashCode()) % numPartitions;    }
    @Override    public void configure(JobConf job) {
    }}
// The key getPartitionValue method will return array of string of either YYYY or {YYYY, MM, DD} of the timestamp1.
For GroupComparator:

    public static class MyGroupComparator extends WritableComparator {        protected MyGroupComparator() {            super(MyPartitionKey.class, true);
        }
        @Override        public int compare(WritableComparable w1, WritableComparable w2) {            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;
            int cmp = key1.type.compareTo(key2.type);            // different type, send to different group            if (cmp != 0)                return cmp;

            // for the same type, should have the same partition value array length            String[] partitionValue1 = key1.getPartitionValue();
            String[] partitionValue2 = key2.getPartitionValue();            assert partitionValue1.length == partitionValue2.length;            StringBuilder sb1 = new StringBuilder();            StringBuilder sb2 = new StringBuilder();
            for (String subValue : partitionValue1) {                sb1.append(subValue);            }            for (String subValue : partitionValue2) {                sb2.append(subValue);
            }            return sb1.toString().compareTo(sb2.toString());        }
Now, here is the strange problem I don't understand. I tested with my MR job. I know in the test data, I have 7 types data, 3 of them partitioned yearly, 4 of them partition daily. In the test data, for the 3 types partitioned daily, there are 2 days data of each type. So I expected the Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In fact, if I don't use this custom MyPartitionKey, just use  Text as the key type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily dataset, there are 11 input groups for the reducer. But I have to support secondary sort. To my surprise, runtime MR job generates 51792 input groups for the reducer. This doesn't make sense. 

If I changed MyGroupComparator compare() method, to only compare the type, like following:      @Override        public int compare(WritableComparable w1, WritableComparable w2) {
            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;            return key1.type.compareTo(key2.type);       }
The MR job generates 7 input group for the reducer, which is what I expects. But when I start to add the comparing of the YYYY or MM or DD data parsed out from the timestamp1, the input group count became very large.

What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group large, because in the test data, there are a lot of combination of unique (id1,id2,id3,id4). But they are NOT part of my GroupComparator implementation. Why in this case, the input group count for the reducer is so high? And in this case, the MR job won't do what I want, as same group of data NOT being sent to the same reducer. Here are the summary of my questions:

1) My understanding is that GroupComparator is the only class to control the input groups of the reducer, is that correct?2) If so, in my case above, I know MyGroupComparator will return 11 unique values from my test data. Why there are 51792 input groups generated? This big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not used in MyGroupComparator, why they affect the reducer input group count?
3) If I only use type in my GroupComparator, I got correct 7 input groups for the reducer. So in this case, it correctly ignored all other data contains in the MyPartitionKey class, why? Is the order of the attributes make any difference? I don't think so, but I cannot explain the above result.

If you have any idea, or my implementation has any problem, please let me know.
Thanks
Yong 		 	   		  

 		 	   		  

RE: Why the reducer's input group count is higher than my GroupComparator implementation

Posted by java8964 java8964 <ja...@hotmail.com>.
Yes. 
The Partitioner uses the same hashcode() on the String generated from the (type + YYYY/MM/DD).
I add the log in the GroupComparator, and observed there are only 11 unique values being compared in the GroupComparator, but don't know why the reducers input group number is much higher than 11.

Date: Tue, 29 Oct 2013 08:57:32 +0100
Subject: Re: Why the reducer's input group count is higher than my GroupComparator implementation
From: drdwitte@gmail.com
To: user@hadoop.apache.org

Did you overwrite the partitioner as well?


2013/10/29 java8964 java8964 <ja...@hotmail.com>




Hi, I have a strange question related to my secondary sort implementation in the MR job.Currently I need to support 2nd sort in one of my MR job. I implemented my custom WritableComparable like following:

public class MyPartitionKey implements WritableComparable<MyPartitionKey> {    String type;    long id1;    String id2;    String id3;    String id4;
    long timestamp1;    long timestamp2}
Then I implemented following methods for this class:
public int compareTo(); // sort the data based on all attributes listed above, sorted the last 2 timestamps descending
public int hashCode(); // generate the hashcode using all attributes abovepublic boolean equals(); // using all the attributes for equals checkpublic void write(DataOutput out) // serialize all the attributes listed above
public void readFields(DataInput in) // deserialize all the attributes listed above
For partition and grouping of my keys, I want the following logic:Based on the type, the data could partition either by year or by day for timestamp1.

For sorting order, I want the data sort by (type, id1, id2, id3, id4), then reverse sorting by (timestamp1, timestamp2).
I implemented my KeyComparator using my sorting order logic listed above, and my Partitioner and GroupComparator based on my logic listed above.

Here is the pseudo code of the Partitioner and GroupComparator:
public class MyPartitioner implements Partitioner {    @Override    public int getPartition(MyPartitionKey key, Value value, int numPartitions) {
        int hashCode = key.getActivityType().name().hashCode();        StringBuilder sb = new StringBuilder();        for (String subPartitionValue : key.getPartitionValue()) {            sb.append(subPartitionValue);
        }        return Math.abs(hashCode * 127 + sb.toString().hashCode()) % numPartitions;    }
    @Override    public void configure(JobConf job) {
    }}
// The key getPartitionValue method will return array of string of either YYYY or {YYYY, MM, DD} of the timestamp1.
For GroupComparator:

    public static class MyGroupComparator extends WritableComparator {        protected MyGroupComparator() {            super(MyPartitionKey.class, true);
        }
        @Override        public int compare(WritableComparable w1, WritableComparable w2) {            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;
            int cmp = key1.type.compareTo(key2.type);            // different type, send to different group            if (cmp != 0)                return cmp;

            // for the same type, should have the same partition value array length            String[] partitionValue1 = key1.getPartitionValue();
            String[] partitionValue2 = key2.getPartitionValue();            assert partitionValue1.length == partitionValue2.length;            StringBuilder sb1 = new StringBuilder();            StringBuilder sb2 = new StringBuilder();
            for (String subValue : partitionValue1) {                sb1.append(subValue);            }            for (String subValue : partitionValue2) {                sb2.append(subValue);
            }            return sb1.toString().compareTo(sb2.toString());        }
Now, here is the strange problem I don't understand. I tested with my MR job. I know in the test data, I have 7 types data, 3 of them partitioned yearly, 4 of them partition daily. In the test data, for the 3 types partitioned daily, there are 2 days data of each type. So I expected the Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In fact, if I don't use this custom MyPartitionKey, just use  Text as the key type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily dataset, there are 11 input groups for the reducer. But I have to support secondary sort. To my surprise, runtime MR job generates 51792 input groups for the reducer. This doesn't make sense. 

If I changed MyGroupComparator compare() method, to only compare the type, like following:      @Override        public int compare(WritableComparable w1, WritableComparable w2) {
            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;            return key1.type.compareTo(key2.type);       }
The MR job generates 7 input group for the reducer, which is what I expects. But when I start to add the comparing of the YYYY or MM or DD data parsed out from the timestamp1, the input group count became very large.

What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group large, because in the test data, there are a lot of combination of unique (id1,id2,id3,id4). But they are NOT part of my GroupComparator implementation. Why in this case, the input group count for the reducer is so high? And in this case, the MR job won't do what I want, as same group of data NOT being sent to the same reducer. Here are the summary of my questions:

1) My understanding is that GroupComparator is the only class to control the input groups of the reducer, is that correct?2) If so, in my case above, I know MyGroupComparator will return 11 unique values from my test data. Why there are 51792 input groups generated? This big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not used in MyGroupComparator, why they affect the reducer input group count?
3) If I only use type in my GroupComparator, I got correct 7 input groups for the reducer. So in this case, it correctly ignored all other data contains in the MyPartitionKey class, why? Is the order of the attributes make any difference? I don't think so, but I cannot explain the above result.

If you have any idea, or my implementation has any problem, please let me know.
Thanks
Yong 		 	   		  

 		 	   		  

RE: Why the reducer's input group count is higher than my GroupComparator implementation

Posted by java8964 java8964 <ja...@hotmail.com>.
Yes. 
The Partitioner uses the same hashcode() on the String generated from the (type + YYYY/MM/DD).
I add the log in the GroupComparator, and observed there are only 11 unique values being compared in the GroupComparator, but don't know why the reducers input group number is much higher than 11.

Date: Tue, 29 Oct 2013 08:57:32 +0100
Subject: Re: Why the reducer's input group count is higher than my GroupComparator implementation
From: drdwitte@gmail.com
To: user@hadoop.apache.org

Did you overwrite the partitioner as well?


2013/10/29 java8964 java8964 <ja...@hotmail.com>




Hi, I have a strange question related to my secondary sort implementation in the MR job.Currently I need to support 2nd sort in one of my MR job. I implemented my custom WritableComparable like following:

public class MyPartitionKey implements WritableComparable<MyPartitionKey> {    String type;    long id1;    String id2;    String id3;    String id4;
    long timestamp1;    long timestamp2}
Then I implemented following methods for this class:
public int compareTo(); // sort the data based on all attributes listed above, sorted the last 2 timestamps descending
public int hashCode(); // generate the hashcode using all attributes abovepublic boolean equals(); // using all the attributes for equals checkpublic void write(DataOutput out) // serialize all the attributes listed above
public void readFields(DataInput in) // deserialize all the attributes listed above
For partition and grouping of my keys, I want the following logic:Based on the type, the data could partition either by year or by day for timestamp1.

For sorting order, I want the data sort by (type, id1, id2, id3, id4), then reverse sorting by (timestamp1, timestamp2).
I implemented my KeyComparator using my sorting order logic listed above, and my Partitioner and GroupComparator based on my logic listed above.

Here is the pseudo code of the Partitioner and GroupComparator:
public class MyPartitioner implements Partitioner {    @Override    public int getPartition(MyPartitionKey key, Value value, int numPartitions) {
        int hashCode = key.getActivityType().name().hashCode();        StringBuilder sb = new StringBuilder();        for (String subPartitionValue : key.getPartitionValue()) {            sb.append(subPartitionValue);
        }        return Math.abs(hashCode * 127 + sb.toString().hashCode()) % numPartitions;    }
    @Override    public void configure(JobConf job) {
    }}
// The key getPartitionValue method will return array of string of either YYYY or {YYYY, MM, DD} of the timestamp1.
For GroupComparator:

    public static class MyGroupComparator extends WritableComparator {        protected MyGroupComparator() {            super(MyPartitionKey.class, true);
        }
        @Override        public int compare(WritableComparable w1, WritableComparable w2) {            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;
            int cmp = key1.type.compareTo(key2.type);            // different type, send to different group            if (cmp != 0)                return cmp;

            // for the same type, should have the same partition value array length            String[] partitionValue1 = key1.getPartitionValue();
            String[] partitionValue2 = key2.getPartitionValue();            assert partitionValue1.length == partitionValue2.length;            StringBuilder sb1 = new StringBuilder();            StringBuilder sb2 = new StringBuilder();
            for (String subValue : partitionValue1) {                sb1.append(subValue);            }            for (String subValue : partitionValue2) {                sb2.append(subValue);
            }            return sb1.toString().compareTo(sb2.toString());        }
Now, here is the strange problem I don't understand. I tested with my MR job. I know in the test data, I have 7 types data, 3 of them partitioned yearly, 4 of them partition daily. In the test data, for the 3 types partitioned daily, there are 2 days data of each type. So I expected the Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In fact, if I don't use this custom MyPartitionKey, just use  Text as the key type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily dataset, there are 11 input groups for the reducer. But I have to support secondary sort. To my surprise, runtime MR job generates 51792 input groups for the reducer. This doesn't make sense. 

If I changed MyGroupComparator compare() method, to only compare the type, like following:      @Override        public int compare(WritableComparable w1, WritableComparable w2) {
            MyPartitionKey key1 = (MyPartitionKey) w1;            MyPartitionKey key2 = (MyPartitionKey) w2;            return key1.type.compareTo(key2.type);       }
The MR job generates 7 input group for the reducer, which is what I expects. But when I start to add the comparing of the YYYY or MM or DD data parsed out from the timestamp1, the input group count became very large.

What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group large, because in the test data, there are a lot of combination of unique (id1,id2,id3,id4). But they are NOT part of my GroupComparator implementation. Why in this case, the input group count for the reducer is so high? And in this case, the MR job won't do what I want, as same group of data NOT being sent to the same reducer. Here are the summary of my questions:

1) My understanding is that GroupComparator is the only class to control the input groups of the reducer, is that correct?2) If so, in my case above, I know MyGroupComparator will return 11 unique values from my test data. Why there are 51792 input groups generated? This big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not used in MyGroupComparator, why they affect the reducer input group count?
3) If I only use type in my GroupComparator, I got correct 7 input groups for the reducer. So in this case, it correctly ignored all other data contains in the MyPartitionKey class, why? Is the order of the attributes make any difference? I don't think so, but I cannot explain the above result.

If you have any idea, or my implementation has any problem, please let me know.
Thanks
Yong 		 	   		  

 		 	   		  

Re: Why the reducer's input group count is higher than my GroupComparator implementation

Posted by Dieter De Witte <dr...@gmail.com>.
Did you overwrite the partitioner as well?


2013/10/29 java8964 java8964 <ja...@hotmail.com>

> Hi, I have a strange question related to my secondary sort implementation
> in the MR job.
> Currently I need to support 2nd sort in one of my MR job. I implemented my
> custom WritableComparable like following:
>
> public class MyPartitionKey implements WritableComparable<MyPartitionKey> {
>     String type;
>     long id1;
>     String id2;
>     String id3;
>     String id4;
>     long timestamp1;
>     long timestamp2
> }
>
> Then I implemented following methods for this class:
>
> public int compareTo(); // sort the data based on all attributes listed
> above, sorted the last 2 timestamps descending
> public int hashCode(); // generate the hashcode using all attributes above
> public boolean equals(); // using all the attributes for equals check
> public void write(DataOutput out) // serialize all the attributes listed
> above
> public void readFields(DataInput in) // deserialize all the attributes
> listed above
>
> For partition and grouping of my keys, I want the following logic:
> Based on the type, the data could partition either by year or by day for
> timestamp1.
>
> For sorting order, I want the data sort by (type, id1, id2, id3, id4),
> then reverse sorting by (timestamp1, timestamp2).
>
> I implemented my KeyComparator using my sorting order logic listed above,
> and my Partitioner and GroupComparator based on my logic listed above.
>
> Here is the pseudo code of the Partitioner and GroupComparator:
>
> public class MyPartitioner implements Partitioner {
>     @Override
>     public int getPartition(MyPartitionKey key, Value value, int
> numPartitions) {
>         int hashCode = key.getActivityType().name().hashCode();
>         StringBuilder sb = new StringBuilder();
>         for (String subPartitionValue : key.getPartitionValue()) {
>             sb.append(subPartitionValue);
>         }
>         return Math.abs(hashCode * 127 + sb.toString().hashCode()) %
> numPartitions;
>     }
>
>     @Override
>     public void configure(JobConf job) {
>     }
> }
>
> // The key getPartitionValue method will return array of string of either
> YYYY or {YYYY, MM, DD} of the timestamp1.
>
> For GroupComparator:
>
>     public static class MyGroupComparator extends WritableComparator {
>         protected MyGroupComparator() {
>             super(MyPartitionKey.class, true);
>         }
>
>         @Override
>         public int compare(WritableComparable w1, WritableComparable w2) {
>             MyPartitionKey key1 = (MyPartitionKey) w1;
>             MyPartitionKey key2 = (MyPartitionKey) w2;
>             int cmp = key1.type.compareTo(key2.type);
>             // different type, send to different group
>             if (cmp != 0)
>                 return cmp;
>
>             // for the same type, should have the same partition value
> array length
>             String[] partitionValue1 = key1.getPartitionValue();
>             String[] partitionValue2 = key2.getPartitionValue();
>             assert partitionValue1.length == partitionValue2.length;
>             StringBuilder sb1 = new StringBuilder();
>             StringBuilder sb2 = new StringBuilder();
>             for (String subValue : partitionValue1) {
>                 sb1.append(subValue);
>             }
>             for (String subValue : partitionValue2) {
>                 sb2.append(subValue);
>             }
>             return sb1.toString().compareTo(sb2.toString());
>         }
>
> Now, here is the strange problem I don't understand. I tested with my MR
> job. I know in the test data, I have 7 types data, 3 of them partitioned
> yearly, 4 of them partition daily. In the test data, for the 3 types
> partitioned daily, there are 2 days data of each type. So I expected the
> Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In
> fact, if I don't use this custom MyPartitionKey, just use  Text as the key
> type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily
> dataset, there are 11 input groups for the reducer. But I have to support
> secondary sort. To my surprise, runtime MR job generates 51792 input groups
> for the reducer. This doesn't make sense.
>
> If I changed MyGroupComparator compare() method, to only compare the
> type, like following:
>       @Override
>         public int compare(WritableComparable w1, WritableComparable w2) {
>             MyPartitionKey key1 = (MyPartitionKey) w1;
>             MyPartitionKey key2 = (MyPartitionKey) w2;
>             return key1.type.compareTo(key2.type);
>        }
> The MR job generates 7 input group for the reducer, which is what I
> expects. But when I start to add the comparing of the YYYY or MM or DD data
> parsed out from the timestamp1, the input group count became very large.
>
> What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group
> large, because in the test data, there are a lot of combination of unique
> (id1,id2,id3,id4). But they are NOT part of my GroupComparator
> implementation. Why in this case, the input group count for the reducer is
> so high? And in this case, the MR job won't do what I want, as same group
> of data NOT being sent to the same reducer. Here are the summary of my
> questions:
>
> 1) My understanding is that GroupComparator is the only class to control
> the input groups of the reducer, is that correct?
> 2) If so, in my case above, I know MyGroupComparator will return 11 unique
> values from my test data. Why there are 51792 input groups generated? This
> big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not
> used in MyGroupComparator, why they affect the reducer input group count?
> 3) If I only use type in my GroupComparator, I got correct 7 input groups
> for the reducer. So in this case, it correctly ignored all other data
> contains in the MyPartitionKey class, why? Is the order of the attributes
> make any difference? I don't think so, but I cannot explain the above
> result.
>
> If you have any idea, or my implementation has any problem, please let me
> know.
>
> Thanks
>
> Yong
>

Re: Why the reducer's input group count is higher than my GroupComparator implementation

Posted by Dieter De Witte <dr...@gmail.com>.
Did you overwrite the partitioner as well?


2013/10/29 java8964 java8964 <ja...@hotmail.com>

> Hi, I have a strange question related to my secondary sort implementation
> in the MR job.
> Currently I need to support 2nd sort in one of my MR job. I implemented my
> custom WritableComparable like following:
>
> public class MyPartitionKey implements WritableComparable<MyPartitionKey> {
>     String type;
>     long id1;
>     String id2;
>     String id3;
>     String id4;
>     long timestamp1;
>     long timestamp2
> }
>
> Then I implemented following methods for this class:
>
> public int compareTo(); // sort the data based on all attributes listed
> above, sorted the last 2 timestamps descending
> public int hashCode(); // generate the hashcode using all attributes above
> public boolean equals(); // using all the attributes for equals check
> public void write(DataOutput out) // serialize all the attributes listed
> above
> public void readFields(DataInput in) // deserialize all the attributes
> listed above
>
> For partition and grouping of my keys, I want the following logic:
> Based on the type, the data could partition either by year or by day for
> timestamp1.
>
> For sorting order, I want the data sort by (type, id1, id2, id3, id4),
> then reverse sorting by (timestamp1, timestamp2).
>
> I implemented my KeyComparator using my sorting order logic listed above,
> and my Partitioner and GroupComparator based on my logic listed above.
>
> Here is the pseudo code of the Partitioner and GroupComparator:
>
> public class MyPartitioner implements Partitioner {
>     @Override
>     public int getPartition(MyPartitionKey key, Value value, int
> numPartitions) {
>         int hashCode = key.getActivityType().name().hashCode();
>         StringBuilder sb = new StringBuilder();
>         for (String subPartitionValue : key.getPartitionValue()) {
>             sb.append(subPartitionValue);
>         }
>         return Math.abs(hashCode * 127 + sb.toString().hashCode()) %
> numPartitions;
>     }
>
>     @Override
>     public void configure(JobConf job) {
>     }
> }
>
> // The key getPartitionValue method will return array of string of either
> YYYY or {YYYY, MM, DD} of the timestamp1.
>
> For GroupComparator:
>
>     public static class MyGroupComparator extends WritableComparator {
>         protected MyGroupComparator() {
>             super(MyPartitionKey.class, true);
>         }
>
>         @Override
>         public int compare(WritableComparable w1, WritableComparable w2) {
>             MyPartitionKey key1 = (MyPartitionKey) w1;
>             MyPartitionKey key2 = (MyPartitionKey) w2;
>             int cmp = key1.type.compareTo(key2.type);
>             // different type, send to different group
>             if (cmp != 0)
>                 return cmp;
>
>             // for the same type, should have the same partition value
> array length
>             String[] partitionValue1 = key1.getPartitionValue();
>             String[] partitionValue2 = key2.getPartitionValue();
>             assert partitionValue1.length == partitionValue2.length;
>             StringBuilder sb1 = new StringBuilder();
>             StringBuilder sb2 = new StringBuilder();
>             for (String subValue : partitionValue1) {
>                 sb1.append(subValue);
>             }
>             for (String subValue : partitionValue2) {
>                 sb2.append(subValue);
>             }
>             return sb1.toString().compareTo(sb2.toString());
>         }
>
> Now, here is the strange problem I don't understand. I tested with my MR
> job. I know in the test data, I have 7 types data, 3 of them partitioned
> yearly, 4 of them partition daily. In the test data, for the 3 types
> partitioned daily, there are 2 days data of each type. So I expected the
> Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In
> fact, if I don't use this custom MyPartitionKey, just use  Text as the key
> type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily
> dataset, there are 11 input groups for the reducer. But I have to support
> secondary sort. To my surprise, runtime MR job generates 51792 input groups
> for the reducer. This doesn't make sense.
>
> If I changed MyGroupComparator compare() method, to only compare the
> type, like following:
>       @Override
>         public int compare(WritableComparable w1, WritableComparable w2) {
>             MyPartitionKey key1 = (MyPartitionKey) w1;
>             MyPartitionKey key2 = (MyPartitionKey) w2;
>             return key1.type.compareTo(key2.type);
>        }
> The MR job generates 7 input group for the reducer, which is what I
> expects. But when I start to add the comparing of the YYYY or MM or DD data
> parsed out from the timestamp1, the input group count became very large.
>
> What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group
> large, because in the test data, there are a lot of combination of unique
> (id1,id2,id3,id4). But they are NOT part of my GroupComparator
> implementation. Why in this case, the input group count for the reducer is
> so high? And in this case, the MR job won't do what I want, as same group
> of data NOT being sent to the same reducer. Here are the summary of my
> questions:
>
> 1) My understanding is that GroupComparator is the only class to control
> the input groups of the reducer, is that correct?
> 2) If so, in my case above, I know MyGroupComparator will return 11 unique
> values from my test data. Why there are 51792 input groups generated? This
> big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not
> used in MyGroupComparator, why they affect the reducer input group count?
> 3) If I only use type in my GroupComparator, I got correct 7 input groups
> for the reducer. So in this case, it correctly ignored all other data
> contains in the MyPartitionKey class, why? Is the order of the attributes
> make any difference? I don't think so, but I cannot explain the above
> result.
>
> If you have any idea, or my implementation has any problem, please let me
> know.
>
> Thanks
>
> Yong
>

Re: Why the reducer's input group count is higher than my GroupComparator implementation

Posted by Dieter De Witte <dr...@gmail.com>.
Did you overwrite the partitioner as well?


2013/10/29 java8964 java8964 <ja...@hotmail.com>

> Hi, I have a strange question related to my secondary sort implementation
> in the MR job.
> Currently I need to support 2nd sort in one of my MR job. I implemented my
> custom WritableComparable like following:
>
> public class MyPartitionKey implements WritableComparable<MyPartitionKey> {
>     String type;
>     long id1;
>     String id2;
>     String id3;
>     String id4;
>     long timestamp1;
>     long timestamp2
> }
>
> Then I implemented following methods for this class:
>
> public int compareTo(); // sort the data based on all attributes listed
> above, sorted the last 2 timestamps descending
> public int hashCode(); // generate the hashcode using all attributes above
> public boolean equals(); // using all the attributes for equals check
> public void write(DataOutput out) // serialize all the attributes listed
> above
> public void readFields(DataInput in) // deserialize all the attributes
> listed above
>
> For partition and grouping of my keys, I want the following logic:
> Based on the type, the data could partition either by year or by day for
> timestamp1.
>
> For sorting order, I want the data sort by (type, id1, id2, id3, id4),
> then reverse sorting by (timestamp1, timestamp2).
>
> I implemented my KeyComparator using my sorting order logic listed above,
> and my Partitioner and GroupComparator based on my logic listed above.
>
> Here is the pseudo code of the Partitioner and GroupComparator:
>
> public class MyPartitioner implements Partitioner {
>     @Override
>     public int getPartition(MyPartitionKey key, Value value, int
> numPartitions) {
>         int hashCode = key.getActivityType().name().hashCode();
>         StringBuilder sb = new StringBuilder();
>         for (String subPartitionValue : key.getPartitionValue()) {
>             sb.append(subPartitionValue);
>         }
>         return Math.abs(hashCode * 127 + sb.toString().hashCode()) %
> numPartitions;
>     }
>
>     @Override
>     public void configure(JobConf job) {
>     }
> }
>
> // The key getPartitionValue method will return array of string of either
> YYYY or {YYYY, MM, DD} of the timestamp1.
>
> For GroupComparator:
>
>     public static class MyGroupComparator extends WritableComparator {
>         protected MyGroupComparator() {
>             super(MyPartitionKey.class, true);
>         }
>
>         @Override
>         public int compare(WritableComparable w1, WritableComparable w2) {
>             MyPartitionKey key1 = (MyPartitionKey) w1;
>             MyPartitionKey key2 = (MyPartitionKey) w2;
>             int cmp = key1.type.compareTo(key2.type);
>             // different type, send to different group
>             if (cmp != 0)
>                 return cmp;
>
>             // for the same type, should have the same partition value
> array length
>             String[] partitionValue1 = key1.getPartitionValue();
>             String[] partitionValue2 = key2.getPartitionValue();
>             assert partitionValue1.length == partitionValue2.length;
>             StringBuilder sb1 = new StringBuilder();
>             StringBuilder sb2 = new StringBuilder();
>             for (String subValue : partitionValue1) {
>                 sb1.append(subValue);
>             }
>             for (String subValue : partitionValue2) {
>                 sb2.append(subValue);
>             }
>             return sb1.toString().compareTo(sb2.toString());
>         }
>
> Now, here is the strange problem I don't understand. I tested with my MR
> job. I know in the test data, I have 7 types data, 3 of them partitioned
> yearly, 4 of them partition daily. In the test data, for the 3 types
> partitioned daily, there are 2 days data of each type. So I expected the
> Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In
> fact, if I don't use this custom MyPartitionKey, just use  Text as the key
> type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily
> dataset, there are 11 input groups for the reducer. But I have to support
> secondary sort. To my surprise, runtime MR job generates 51792 input groups
> for the reducer. This doesn't make sense.
>
> If I changed MyGroupComparator compare() method, to only compare the
> type, like following:
>       @Override
>         public int compare(WritableComparable w1, WritableComparable w2) {
>             MyPartitionKey key1 = (MyPartitionKey) w1;
>             MyPartitionKey key2 = (MyPartitionKey) w2;
>             return key1.type.compareTo(key2.type);
>        }
> The MR job generates 7 input group for the reducer, which is what I
> expects. But when I start to add the comparing of the YYYY or MM or DD data
> parsed out from the timestamp1, the input group count became very large.
>
> What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group
> large, because in the test data, there are a lot of combination of unique
> (id1,id2,id3,id4). But they are NOT part of my GroupComparator
> implementation. Why in this case, the input group count for the reducer is
> so high? And in this case, the MR job won't do what I want, as same group
> of data NOT being sent to the same reducer. Here are the summary of my
> questions:
>
> 1) My understanding is that GroupComparator is the only class to control
> the input groups of the reducer, is that correct?
> 2) If so, in my case above, I know MyGroupComparator will return 11 unique
> values from my test data. Why there are 51792 input groups generated? This
> big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not
> used in MyGroupComparator, why they affect the reducer input group count?
> 3) If I only use type in my GroupComparator, I got correct 7 input groups
> for the reducer. So in this case, it correctly ignored all other data
> contains in the MyPartitionKey class, why? Is the order of the attributes
> make any difference? I don't think so, but I cannot explain the above
> result.
>
> If you have any idea, or my implementation has any problem, please let me
> know.
>
> Thanks
>
> Yong
>

Re: Why the reducer's input group count is higher than my GroupComparator implementation

Posted by Dieter De Witte <dr...@gmail.com>.
Did you overwrite the partitioner as well?


2013/10/29 java8964 java8964 <ja...@hotmail.com>

> Hi, I have a strange question related to my secondary sort implementation
> in the MR job.
> Currently I need to support 2nd sort in one of my MR job. I implemented my
> custom WritableComparable like following:
>
> public class MyPartitionKey implements WritableComparable<MyPartitionKey> {
>     String type;
>     long id1;
>     String id2;
>     String id3;
>     String id4;
>     long timestamp1;
>     long timestamp2
> }
>
> Then I implemented following methods for this class:
>
> public int compareTo(); // sort the data based on all attributes listed
> above, sorted the last 2 timestamps descending
> public int hashCode(); // generate the hashcode using all attributes above
> public boolean equals(); // using all the attributes for equals check
> public void write(DataOutput out) // serialize all the attributes listed
> above
> public void readFields(DataInput in) // deserialize all the attributes
> listed above
>
> For partition and grouping of my keys, I want the following logic:
> Based on the type, the data could partition either by year or by day for
> timestamp1.
>
> For sorting order, I want the data sort by (type, id1, id2, id3, id4),
> then reverse sorting by (timestamp1, timestamp2).
>
> I implemented my KeyComparator using my sorting order logic listed above,
> and my Partitioner and GroupComparator based on my logic listed above.
>
> Here is the pseudo code of the Partitioner and GroupComparator:
>
> public class MyPartitioner implements Partitioner {
>     @Override
>     public int getPartition(MyPartitionKey key, Value value, int
> numPartitions) {
>         int hashCode = key.getActivityType().name().hashCode();
>         StringBuilder sb = new StringBuilder();
>         for (String subPartitionValue : key.getPartitionValue()) {
>             sb.append(subPartitionValue);
>         }
>         return Math.abs(hashCode * 127 + sb.toString().hashCode()) %
> numPartitions;
>     }
>
>     @Override
>     public void configure(JobConf job) {
>     }
> }
>
> // The key getPartitionValue method will return array of string of either
> YYYY or {YYYY, MM, DD} of the timestamp1.
>
> For GroupComparator:
>
>     public static class MyGroupComparator extends WritableComparator {
>         protected MyGroupComparator() {
>             super(MyPartitionKey.class, true);
>         }
>
>         @Override
>         public int compare(WritableComparable w1, WritableComparable w2) {
>             MyPartitionKey key1 = (MyPartitionKey) w1;
>             MyPartitionKey key2 = (MyPartitionKey) w2;
>             int cmp = key1.type.compareTo(key2.type);
>             // different type, send to different group
>             if (cmp != 0)
>                 return cmp;
>
>             // for the same type, should have the same partition value
> array length
>             String[] partitionValue1 = key1.getPartitionValue();
>             String[] partitionValue2 = key2.getPartitionValue();
>             assert partitionValue1.length == partitionValue2.length;
>             StringBuilder sb1 = new StringBuilder();
>             StringBuilder sb2 = new StringBuilder();
>             for (String subValue : partitionValue1) {
>                 sb1.append(subValue);
>             }
>             for (String subValue : partitionValue2) {
>                 sb2.append(subValue);
>             }
>             return sb1.toString().compareTo(sb2.toString());
>         }
>
> Now, here is the strange problem I don't understand. I tested with my MR
> job. I know in the test data, I have 7 types data, 3 of them partitioned
> yearly, 4 of them partition daily. In the test data, for the 3 types
> partitioned daily, there are 2 days data of each type. So I expected the
> Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In
> fact, if I don't use this custom MyPartitionKey, just use  Text as the key
> type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily
> dataset, there are 11 input groups for the reducer. But I have to support
> secondary sort. To my surprise, runtime MR job generates 51792 input groups
> for the reducer. This doesn't make sense.
>
> If I changed MyGroupComparator compare() method, to only compare the
> type, like following:
>       @Override
>         public int compare(WritableComparable w1, WritableComparable w2) {
>             MyPartitionKey key1 = (MyPartitionKey) w1;
>             MyPartitionKey key2 = (MyPartitionKey) w2;
>             return key1.type.compareTo(key2.type);
>        }
> The MR job generates 7 input group for the reducer, which is what I
> expects. But when I start to add the comparing of the YYYY or MM or DD data
> parsed out from the timestamp1, the input group count became very large.
>
> What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group
> large, because in the test data, there are a lot of combination of unique
> (id1,id2,id3,id4). But they are NOT part of my GroupComparator
> implementation. Why in this case, the input group count for the reducer is
> so high? And in this case, the MR job won't do what I want, as same group
> of data NOT being sent to the same reducer. Here are the summary of my
> questions:
>
> 1) My understanding is that GroupComparator is the only class to control
> the input groups of the reducer, is that correct?
> 2) If so, in my case above, I know MyGroupComparator will return 11 unique
> values from my test data. Why there are 51792 input groups generated? This
> big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not
> used in MyGroupComparator, why they affect the reducer input group count?
> 3) If I only use type in my GroupComparator, I got correct 7 input groups
> for the reducer. So in this case, it correctly ignored all other data
> contains in the MyPartitionKey class, why? Is the order of the attributes
> make any difference? I don't think so, but I cannot explain the above
> result.
>
> If you have any idea, or my implementation has any problem, please let me
> know.
>
> Thanks
>
> Yong
>