You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hbase.apache.org by Xine Jar <xi...@googlemail.com> on 2009/08/11 18:27:18 UTC

Why the map input records are equal to the map output records

Hallo guys,
I am using hadoop-0.19.1 and hbase-0.19.3 on a cluster of four nodes. I have
created an hbase database composed of one FamilyColumn 'cf' and several
columns ''cf:Value cf:SensorType cf:SensorID .....'. The table has in total
100 records. The programs searches only for SensorType=temperature and tries
to calculate the average of all the temperature readings. The Job is exactly
divided as follows

*Mapper:*
The mapper scans each record in the table, whenever it finds a
SensorType=temperature it executes two steps: First, takes the value of the
reading and sums it with the previously found values, The variable
"sumreadings" contains the sum of the values. Second, the mapper increments
a counter called "numreadings". When there is no more records in the table,
it concatenates in a Text the value of "sumreadings" together with the value
"numreadings" and passes the Text result to the outputcollector.

*Reducer:*
the reducer execute the average by summing all the received "sumreadings"
and dividing the result with the sum of the "numreadings".


*A snapshot of the Mapper :*
*public void map(ImmutableBytesWritable key,RowResult value,
OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
       double numberreadings=0;
       double sumreadings=0;

        if(table==null)
          throw new IOException("table is null");

       //set a scanner
         Scanner scanner=table.getScanner(new String[] {"cf:Value",
"cf:Type", "cf:TimeStamp", "cf:Latitude", "cf:Longitude", "cf:SensorNode"});
         RowResult rowresult=scanner.next();

      //scanning the table, filtering out the values, and count them
        while(rowresult!=null){

         String stringtype= new
String((rowresult.get(Bytes.toBytes("cf:Type"))).getValue());

         if((stringtype).equals("temperature")==true)
            ///summ the correct reading value
            {String stringval=new
String((rowresult.get(Bytes.toBytes("cf:Value"))).getValue());
             double doubleval=Double.parseDouble(stringval.trim());
             sumreadings=sumreadings+doubleval;

             ///summ the number of readings
             numberreadings=numberreadings+1;
            }
          rowresult=scanner.next();

         }

        scanner.close();

       //send the summ of the values as well as the number
        String strsumreadings=Double.toString(sumreadings);
        String strnumberreadings=Double.toString(numberreadings);
        String strmapoutvalue= strsumreadings+" "+strnumberreadings;

        mapoutputvalue.set(strmapoutvalue);
        output.collect(mapoutputkey,mapoutputvalue);

 }*


*Questions:*
1-For 100 records, I noticed that I have 1 map task and 1 reduce task, and
the job finishes after 12 Sec. Whenever I extend the number of records in
the htable to 10,000 I still have 1 map and 1 reduce task and the job
finishes after 1 hour!!!!!!
The mapper is incredibly slow, what is so heavy in my code?

2-I have checked the UI from /http:IP:50030/, I noticed that :

    Map Input Records: Map=100 Reduce=0
    Map Output Records: Map=100 Reduce=0
    Map Output bytes:Map=2,500 and Reduce=0

   Since I am calling the collect() on the map's OutputCollector only once
as you see from the snapshot of the code, how come
   the Map Output Records=100??? shouldn't be 1?



Thank you for your help,
CJ


------------------------------
------------------------------
------------------------------
Go back to JobTracker <http://134.130.223.85:50030/jobtracker.jsp>
------------------------------
Hadoop <http://hadoop.apache.org/core>, 2009.

Re: Why the map input records are equal to the map output records

Posted by Mathias De Maré <ma...@gmail.com>.
On Wed, Aug 12, 2009 at 6:57 PM, Xine Jar <xi...@googlemail.com> wrote:

>
> For my own information, is there a way I can verify that it did not read
> the table several times?
> should the Map output record become equal to the number of records in the
> table or not necessarily?
>
> Thank you,


Yes, it should be the same.

Mathias

Re: Why the map input records are equal to the map output records

Posted by Xine Jar <xi...@googlemail.com>.
For my own information, is there a way I can verify that it did not read the
table several times?
should the Map output record become equal to the number of records in the
table or not necessarily?

Thank you,


2009/8/12 Mathias De Maré <ma...@gmail.com>

> Please send your reply to the hbase mailing list as well.
>
> On Wed, Aug 12, 2009 at 3:37 PM, Xine Jar <xi...@googlemail.com>wrote:
>
>> Your proposed architecture is clear. You try to split the calculation load
>> between the
>> mapper and the reducer, and even creating another job for the final
>> calculation.
>>
>> but if I shall keep reading in the mapper the way I am doing it know, this
>> means that in order to sort all the temperature values, I still need to read
>> the table N*N times where N is the number of records in a table? Right? So
>> my initial problem persists!!! Isn't it possible to read the Table only
>> once?
>
>
> Yes, that's possible, and that's what I proposed.
> The map function gets called for each record.
> Your map function could be changed to something simple, like (pseudocode):
>
> map(key, value) {
> output.collect(key.temperature, value);
> }
>
> Then a reduce function (the key is the temperature you passed in the map
> function):
> reduce(key, List values) {
> calculate_average();
> }
>
> And then a second job, like I mentioned.
>
> Mathias
>
>
>>
>>
>> For example if I would be using a textfile as an input instead of an hbase
>> table, the standard way to do, is copying the file to a string, tokenize it,
>> GO THROUGH IT ONCE  and filter out whatever I want!!
>>
>> A similar approach,going through the table only once, is not possible with
>> and Hbase table ?!
>>
>>
>>
>> Regards,
>> CJ
>>
>>
>> 2009/8/12 Mathias De Maré <ma...@gmail.com>
>>
>>> Well, I think you could probably take care of the issue by using a
>>> somewhat different architecture.
>>>
>>> If I understand correctly, you take all of the values with the same
>>> temperature together. This is in fact a Reduce operation.
>>>
>>> You could structure as follows:
>>> -Read in like you do now, but make your Map simpler. For each map (so for
>>> each record), write away the temperature as the key, and the record as a
>>> value.
>>> -Each reducer will then have a list of records, each with the same
>>> temperature. You can sum the entries in the list and write everything away.
>>> Then you will have 1 combined result per temperature.
>>>
>>> You could then start a second job that has a pass-through Mapper, and
>>> then do your final calculation in the Reducer.
>>>
>>> Does it sound like I'm making sense to some degree? :-)
>>>
>>> Mathias
>>>
>>>
>>> On Wed, Aug 12, 2009 at 2:38 PM, Xine Jar <xi...@googlemail.com>wrote:
>>>
>>>> Aha!! I understand!!
>>>> So basically this is the reason why I am getting 100 written Map output
>>>> records. Because the mapper is calling the collect() of the OutputCollector
>>>> 100 times= number of records in the table.
>>>>
>>>> In this case I assume I have to pass the HBASE table instead of the
>>>> records as an input to the mapper right? Is there such a Java example you
>>>> could point it out for me?
>>>>
>>>> Regards,
>>>> CJ
>>>>
>>>>
>>>> 2009/8/12 Mathias De Maré <ma...@gmail.com>
>>>>
>>>> Hi,
>>>>>
>>>>> On Tue, Aug 11, 2009 at 6:27 PM, Xine Jar <xi...@googlemail.com>wrote:
>>>>>>
>>>>>> *A snapshot of the Mapper :*
>>>>>>
>>>>>> *public void map(ImmutableBytesWritable key,RowResult value,
>>>>>> OutputCollector<Text, Text> output, Reporter reporter) throws
>>>>>> IOException {
>>>>>>       double numberreadings=0;
>>>>>>       double sumreadings=0;
>>>>>>
>>>>>>        if(table==null)
>>>>>>          throw new IOException("table is null");
>>>>>>
>>>>>>       //set a scanner
>>>>>>         Scanner scanner=table.getScanner(new String[] {"cf:Value",
>>>>>> "cf:Type", "cf:TimeStamp", "cf:Latitude", "cf:Longitude",
>>>>>> "cf:SensorNode"});
>>>>>>         RowResult rowresult=scanner.next();
>>>>>>
>>>>>>      //scanning the table, filtering out the values, and count them
>>>>>>        while(rowresult!=null){
>>>>>>
>>>>>>         String stringtype= new
>>>>>> String((rowresult.get(Bytes.toBytes("cf:Type"))).getValue());
>>>>>>
>>>>>>         if((stringtype).equals("temperature")==true)
>>>>>>            ///summ the correct reading value
>>>>>>            {String stringval=new
>>>>>> String((rowresult.get(Bytes.toBytes("cf:Value"))).getValue());
>>>>>>             double doubleval=Double.parseDouble(stringval.trim());
>>>>>>             sumreadings=sumreadings+doubleval;
>>>>>>
>>>>>>             ///summ the number of readings
>>>>>>             numberreadings=numberreadings+1;
>>>>>>            }
>>>>>>          rowresult=scanner.next();
>>>>>>
>>>>>>         }
>>>>>>
>>>>>>        scanner.close();
>>>>>>
>>>>>>       //send the summ of the values as well as the number
>>>>>>        String strsumreadings=Double.toString(sumreadings);
>>>>>>        String strnumberreadings=Double.toString(numberreadings);
>>>>>>        String strmapoutvalue= strsumreadings+" "+strnumberreadings;
>>>>>>
>>>>>>        mapoutputvalue.set(strmapoutvalue);
>>>>>>        output.collect(mapoutputkey,mapoutputvalue);
>>>>>>
>>>>>>  }*
>>>>>>
>>>>>>
>>>>>> *Questions:*
>>>>>> 1-For 100 records, I noticed that I have 1 map task and 1 reduce task,
>>>>>> and
>>>>>> the job finishes after 12 Sec. Whenever I extend the number of records
>>>>>> in
>>>>>> the htable to 10,000 I still have 1 map and 1 reduce task and the job
>>>>>> finishes after 1 hour!!!!!!
>>>>>> The mapper is incredibly slow, what is so heavy in my code?
>>>>>>
>>>>>
>>>>> From your code, it looks like you are using the HBase records as input
>>>>> for the mapper. Then, for each record, you go through the entire table
>>>>> again, so you do N scans of the HBase table, and read in total N*N records.
>>>>> That's what's heavy in your code.
>>>>>
>>>>> Mathias
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Why the map input records are equal to the map output records

Posted by Mathias De Maré <ma...@gmail.com>.
Please send your reply to the hbase mailing list as well.

On Wed, Aug 12, 2009 at 3:37 PM, Xine Jar <xi...@googlemail.com> wrote:

> Your proposed architecture is clear. You try to split the calculation load
> between the
> mapper and the reducer, and even creating another job for the final
> calculation.
>
> but if I shall keep reading in the mapper the way I am doing it know, this
> means that in order to sort all the temperature values, I still need to read
> the table N*N times where N is the number of records in a table? Right? So
> my initial problem persists!!! Isn't it possible to read the Table only
> once?


Yes, that's possible, and that's what I proposed.
The map function gets called for each record.
Your map function could be changed to something simple, like (pseudocode):

map(key, value) {
output.collect(key.temperature, value);
}

Then a reduce function (the key is the temperature you passed in the map
function):
reduce(key, List values) {
calculate_average();
}

And then a second job, like I mentioned.

Mathias


>
>
> For example if I would be using a textfile as an input instead of an hbase
> table, the standard way to do, is copying the file to a string, tokenize it,
> GO THROUGH IT ONCE  and filter out whatever I want!!
>
> A similar approach,going through the table only once, is not possible with
> and Hbase table ?!
>
>
>
> Regards,
> CJ
>
>
> 2009/8/12 Mathias De Maré <ma...@gmail.com>
>
>> Well, I think you could probably take care of the issue by using a
>> somewhat different architecture.
>>
>> If I understand correctly, you take all of the values with the same
>> temperature together. This is in fact a Reduce operation.
>>
>> You could structure as follows:
>> -Read in like you do now, but make your Map simpler. For each map (so for
>> each record), write away the temperature as the key, and the record as a
>> value.
>> -Each reducer will then have a list of records, each with the same
>> temperature. You can sum the entries in the list and write everything away.
>> Then you will have 1 combined result per temperature.
>>
>> You could then start a second job that has a pass-through Mapper, and then
>> do your final calculation in the Reducer.
>>
>> Does it sound like I'm making sense to some degree? :-)
>>
>> Mathias
>>
>>
>> On Wed, Aug 12, 2009 at 2:38 PM, Xine Jar <xi...@googlemail.com>wrote:
>>
>>> Aha!! I understand!!
>>> So basically this is the reason why I am getting 100 written Map output
>>> records. Because the mapper is calling the collect() of the OutputCollector
>>> 100 times= number of records in the table.
>>>
>>> In this case I assume I have to pass the HBASE table instead of the
>>> records as an input to the mapper right? Is there such a Java example you
>>> could point it out for me?
>>>
>>> Regards,
>>> CJ
>>>
>>>
>>> 2009/8/12 Mathias De Maré <ma...@gmail.com>
>>>
>>> Hi,
>>>>
>>>> On Tue, Aug 11, 2009 at 6:27 PM, Xine Jar <xi...@googlemail.com>wrote:
>>>>>
>>>>> *A snapshot of the Mapper :*
>>>>>
>>>>> *public void map(ImmutableBytesWritable key,RowResult value,
>>>>> OutputCollector<Text, Text> output, Reporter reporter) throws
>>>>> IOException {
>>>>>       double numberreadings=0;
>>>>>       double sumreadings=0;
>>>>>
>>>>>        if(table==null)
>>>>>          throw new IOException("table is null");
>>>>>
>>>>>       //set a scanner
>>>>>         Scanner scanner=table.getScanner(new String[] {"cf:Value",
>>>>> "cf:Type", "cf:TimeStamp", "cf:Latitude", "cf:Longitude",
>>>>> "cf:SensorNode"});
>>>>>         RowResult rowresult=scanner.next();
>>>>>
>>>>>      //scanning the table, filtering out the values, and count them
>>>>>        while(rowresult!=null){
>>>>>
>>>>>         String stringtype= new
>>>>> String((rowresult.get(Bytes.toBytes("cf:Type"))).getValue());
>>>>>
>>>>>         if((stringtype).equals("temperature")==true)
>>>>>            ///summ the correct reading value
>>>>>            {String stringval=new
>>>>> String((rowresult.get(Bytes.toBytes("cf:Value"))).getValue());
>>>>>             double doubleval=Double.parseDouble(stringval.trim());
>>>>>             sumreadings=sumreadings+doubleval;
>>>>>
>>>>>             ///summ the number of readings
>>>>>             numberreadings=numberreadings+1;
>>>>>            }
>>>>>          rowresult=scanner.next();
>>>>>
>>>>>         }
>>>>>
>>>>>        scanner.close();
>>>>>
>>>>>       //send the summ of the values as well as the number
>>>>>        String strsumreadings=Double.toString(sumreadings);
>>>>>        String strnumberreadings=Double.toString(numberreadings);
>>>>>        String strmapoutvalue= strsumreadings+" "+strnumberreadings;
>>>>>
>>>>>        mapoutputvalue.set(strmapoutvalue);
>>>>>        output.collect(mapoutputkey,mapoutputvalue);
>>>>>
>>>>>  }*
>>>>>
>>>>>
>>>>> *Questions:*
>>>>> 1-For 100 records, I noticed that I have 1 map task and 1 reduce task,
>>>>> and
>>>>> the job finishes after 12 Sec. Whenever I extend the number of records
>>>>> in
>>>>> the htable to 10,000 I still have 1 map and 1 reduce task and the job
>>>>> finishes after 1 hour!!!!!!
>>>>> The mapper is incredibly slow, what is so heavy in my code?
>>>>>
>>>>
>>>> From your code, it looks like you are using the HBase records as input
>>>> for the mapper. Then, for each record, you go through the entire table
>>>> again, so you do N scans of the HBase table, and read in total N*N records.
>>>> That's what's heavy in your code.
>>>>
>>>> Mathias
>>>>
>>>>
>>>
>>
>

Re: Why the map input records are equal to the map output records

Posted by Mathias De Maré <ma...@gmail.com>.
Well, I think you could probably take care of the issue by using a somewhat
different architecture.

If I understand correctly, you take all of the values with the same
temperature together. This is in fact a Reduce operation.

You could structure as follows:
-Read in like you do now, but make your Map simpler. For each map (so for
each record), write away the temperature as the key, and the record as a
value.
-Each reducer will then have a list of records, each with the same
temperature. You can sum the entries in the list and write everything away.
Then you will have 1 combined result per temperature.

You could then start a second job that has a pass-through Mapper, and then
do your final calculation in the Reducer.

Does it sound like I'm making sense to some degree? :-)

Mathias

On Wed, Aug 12, 2009 at 2:38 PM, Xine Jar <xi...@googlemail.com> wrote:

> Aha!! I understand!!
> So basically this is the reason why I am getting 100 written Map output
> records. Because the mapper is calling the collect() of the OutputCollector
> 100 times= number of records in the table.
>
> In this case I assume I have to pass the HBASE table instead of the records
> as an input to the mapper right? Is there such a Java example you could
> point it out for me?
>
> Regards,
> CJ
>
>
> 2009/8/12 Mathias De Maré <ma...@gmail.com>
>
> Hi,
>>
>> On Tue, Aug 11, 2009 at 6:27 PM, Xine Jar <xi...@googlemail.com>wrote:
>>>
>>> *A snapshot of the Mapper :*
>>>
>>> *public void map(ImmutableBytesWritable key,RowResult value,
>>> OutputCollector<Text, Text> output, Reporter reporter) throws IOException
>>> {
>>>       double numberreadings=0;
>>>       double sumreadings=0;
>>>
>>>        if(table==null)
>>>          throw new IOException("table is null");
>>>
>>>       //set a scanner
>>>         Scanner scanner=table.getScanner(new String[] {"cf:Value",
>>> "cf:Type", "cf:TimeStamp", "cf:Latitude", "cf:Longitude",
>>> "cf:SensorNode"});
>>>         RowResult rowresult=scanner.next();
>>>
>>>      //scanning the table, filtering out the values, and count them
>>>        while(rowresult!=null){
>>>
>>>         String stringtype= new
>>> String((rowresult.get(Bytes.toBytes("cf:Type"))).getValue());
>>>
>>>         if((stringtype).equals("temperature")==true)
>>>            ///summ the correct reading value
>>>            {String stringval=new
>>> String((rowresult.get(Bytes.toBytes("cf:Value"))).getValue());
>>>             double doubleval=Double.parseDouble(stringval.trim());
>>>             sumreadings=sumreadings+doubleval;
>>>
>>>             ///summ the number of readings
>>>             numberreadings=numberreadings+1;
>>>            }
>>>          rowresult=scanner.next();
>>>
>>>         }
>>>
>>>        scanner.close();
>>>
>>>       //send the summ of the values as well as the number
>>>        String strsumreadings=Double.toString(sumreadings);
>>>        String strnumberreadings=Double.toString(numberreadings);
>>>        String strmapoutvalue= strsumreadings+" "+strnumberreadings;
>>>
>>>        mapoutputvalue.set(strmapoutvalue);
>>>        output.collect(mapoutputkey,mapoutputvalue);
>>>
>>>  }*
>>>
>>>
>>> *Questions:*
>>> 1-For 100 records, I noticed that I have 1 map task and 1 reduce task,
>>> and
>>> the job finishes after 12 Sec. Whenever I extend the number of records in
>>> the htable to 10,000 I still have 1 map and 1 reduce task and the job
>>> finishes after 1 hour!!!!!!
>>> The mapper is incredibly slow, what is so heavy in my code?
>>>
>>
>> From your code, it looks like you are using the HBase records as input for
>> the mapper. Then, for each record, you go through the entire table again, so
>> you do N scans of the HBase table, and read in total N*N records. That's
>> what's heavy in your code.
>>
>> Mathias
>>
>>
>

Re: Why the map input records are equal to the map output records

Posted by Mathias De Maré <ma...@gmail.com>.
Hi,

On Tue, Aug 11, 2009 at 6:27 PM, Xine Jar <xi...@googlemail.com> wrote:
>
> *A snapshot of the Mapper :*
> *public void map(ImmutableBytesWritable key,RowResult value,
> OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
>       double numberreadings=0;
>       double sumreadings=0;
>
>        if(table==null)
>          throw new IOException("table is null");
>
>       //set a scanner
>         Scanner scanner=table.getScanner(new String[] {"cf:Value",
> "cf:Type", "cf:TimeStamp", "cf:Latitude", "cf:Longitude",
> "cf:SensorNode"});
>         RowResult rowresult=scanner.next();
>
>      //scanning the table, filtering out the values, and count them
>        while(rowresult!=null){
>
>         String stringtype= new
> String((rowresult.get(Bytes.toBytes("cf:Type"))).getValue());
>
>         if((stringtype).equals("temperature")==true)
>            ///summ the correct reading value
>            {String stringval=new
> String((rowresult.get(Bytes.toBytes("cf:Value"))).getValue());
>             double doubleval=Double.parseDouble(stringval.trim());
>             sumreadings=sumreadings+doubleval;
>
>             ///summ the number of readings
>             numberreadings=numberreadings+1;
>            }
>          rowresult=scanner.next();
>
>         }
>
>        scanner.close();
>
>       //send the summ of the values as well as the number
>        String strsumreadings=Double.toString(sumreadings);
>        String strnumberreadings=Double.toString(numberreadings);
>        String strmapoutvalue= strsumreadings+" "+strnumberreadings;
>
>        mapoutputvalue.set(strmapoutvalue);
>        output.collect(mapoutputkey,mapoutputvalue);
>
>  }*
>
>
> *Questions:*
> 1-For 100 records, I noticed that I have 1 map task and 1 reduce task, and
> the job finishes after 12 Sec. Whenever I extend the number of records in
> the htable to 10,000 I still have 1 map and 1 reduce task and the job
> finishes after 1 hour!!!!!!
> The mapper is incredibly slow, what is so heavy in my code?


>From your code, it looks like you are using the HBase records as input for
the mapper. Then, for each record, you go through the entire table again, so
you do N scans of the HBase table, and read in total N*N records. That's
what's heavy in your code.

Mathias