You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Namit Jain <nj...@facebook.com> on 2009/07/07 01:09:07 UTC

RE: Combined data more throughput

The concatenation is hanging for you.
How much data is there after the filter ?

The number of reducers is a function of the above size - by default 1G/reducer.

To increase the number of reducers, set hive.merge.size.per.mapper to a lower value.


    HIVEMERGEMAPFILESSIZE("hive.merge.size.per.mapper", (long)(1000*1000*1000)),


But, 1 reducer means that had 1G of output data - it may not be hanging, but proceeding slowly,
it has to read from all ~4000 odd mappers.


Increasing the split size may lead to less mappers to start with and help.

-namit


-----Original Message-----
From: Edward Capriolo [mailto:edlinuxguru@gmail.com] 
Sent: Monday, July 06, 2009 1:05 PM
To: Namit Jain
Subject: Combined data more throughput

Namit,

Sorry, I could not reply to this on list. I took the latest trunk and
now my query is failing. Let me know what you think.

Table 1:
CREATE TABLE raw_web_data ( log_date STRING,
log_time STRING,
remote_ip STRING,
...
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\037'
LINES TERMINATED BY '\012'

Table 2
CREATE TABLE raw_web_data_seq ( log_date STRING,
log_time STRING,
remote_ip STRING,
...
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\037'
LINES TERMINATED BY '\012'
STORED AS SEQUENCEFILE

My problem may be that my # of reduces seems frozen at 1. Even though
I have set this using set mapred.reduce.tasks=50

The first stage completes. My second stage is hung at 0%
Stage2 reduce logs
2009-07-06 15:52:41,757 INFO org.apache.hadoop.mapred.ReduceTask:
Ignoring obsolete output of FAILED map-task:
'attempt_200905271425_1427_m_002111_0'
2009-07-06 15:52:41,757 INFO org.apache.hadoop.mapred.ReduceTask:
Ignoring obsolete output of FAILED map-task:
'attempt_200905271425_1427_m_001614_1'
2009-07-06 15:52:41,757 INFO org.apache.hadoop.mapred.ReduceTask:
Ignoring obsolete output of FAILED map-task:
'attempt_200905271425_1427_m_001179_1'
2009-07-06 15:52:41,758 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_200905271425_1427_r_000000_0: Got 42 obsolete map-outputs from
tasktracker

I am also seeing this in the mapper...

2009-07-06 15:27:33,181 INFO
org.apache.hadoop.hive.ql.exec.MapOperator: Initialization Done 5
2009-07-06 15:27:33,212 WARN
org.apache.hadoop.hive.serde2.lazy.LazyStruct: Extra bytes detected at
the end of the row! Ignoring similar problems.
2009-07-06 15:27:33,408 INFO
org.apache.hadoop.hive.ql.exec.MapOperator: DESERIALIZE_ERRORS:0

hive> explain INSERT OVERWRITE TABLE raw_web_data_seq PARTITION
(log_date_part='2009-07-05')
    > select log_date,log_time,remote_ip,remote_login,server_name,local_ip,dash,request_method,
request_url,query_string,http_status,literal0,bytes_sent,cont_length,time_to_serve,user_agent,cookie,referer
    > from raw_web_data  where log_date_part='2009-07-05';
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_TABREF raw_web_data)) (TOK_INSERT
(TOK_DESTINATION (TOK_TAB raw_web_data_seq (TOK_PARTSPEC (TOK_PARTVAL
log_date_part '2009-07-05')))) (TOK_SELECT (TOK_SELEXPR
(TOK_TABLE_OR_COL log_date)) (TOK_SELEXPR (TOK_TABLE_OR_COL log_time))
(TOK_SELEXPR (TOK_TABLE_OR_COL remote_ip)) (TOK_SELEXPR
(TOK_TABLE_OR_COL remote_login)) (TOK_SELEXPR (TOK_TABLE_OR_COL
server_name)) (TOK_SELEXPR (TOK_TABLE_OR_COL local_ip)) (TOK_SELEXPR
(TOK_TABLE_OR_COL dash)) (TOK_SELEXPR (TOK_TABLE_OR_COL
request_method)) (TOK_SELEXPR (TOK_TABLE_OR_COL request_url))
(TOK_SELEXPR (TOK_TABLE_OR_COL query_string)) (TOK_SELEXPR
(TOK_TABLE_OR_COL http_status)) (TOK_SELEXPR (TOK_TABLE_OR_COL
literal0)) (TOK_SELEXPR (TOK_TABLE_OR_COL bytes_sent)) (TOK_SELEXPR
(TOK_TABLE_OR_COL cont_length)) (TOK_SELEXPR (TOK_TABLE_OR_COL
time_to_serve)) (TOK_SELEXPR (TOK_TABLE_OR_COL user_agent))
(TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR (TOK_TABLE_OR_COL
referer))) (TOK_WHERE (= (TOK_TABLE_OR_COL log_date_part)
'2009-07-05'))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-4 depends on stages: Stage-1
  Stage-0 depends on stages: Stage-4

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        raw_web_data
            Filter Operator
              predicate:
                  expr: (log_date_part = '2009-07-05')
                  type: boolean
              Filter Operator
                predicate:
                    expr: (log_date_part = '2009-07-05')
                    type: boolean
                Select Operator
                  expressions:
                        expr: log_date
                        type: string
                        expr: log_time
                        type: string
                        expr: remote_ip
                        type: string
                        expr: remote_login
                        type: string
                        expr: server_name
                        type: string
                        expr: local_ip
                        type: string
                        expr: dash
                        type: string
                        expr: request_method
                        type: string
                        expr: request_url
                        type: string
                        expr: query_string
                        type: string
                        expr: http_status
                        type: int
                        expr: literal0
                        type: string
                        expr: bytes_sent
                        type: int
                        expr: cont_length
                        type: int
                        expr: time_to_serve
                        type: int
                        expr: user_agent
                        type: string
                        expr: cookie
                        type: string
                        expr: referer
                        type: string
                  File Output Operator
                    compressed: false
                    GlobalTableId: 1
                    table:
                        input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
                        output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                        serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                        name: raw_web_data_seq

  Stage: Stage-4
    Conditional Operator
      list of dependent Tasks:
          Move Operator
            files:
                hdfs directory: true
                destination:
hdfs://nyhadoopname1.ops.edwardcapriolo.com:8020/tmp/hive-ecapriolo/1248574254/10000
          Map Reduce
            Alias -> Map Operator Tree:
              hdfs://nyhadoopname1.ops.edwardcapriolo.com:8020/tmp/hive-ecapriolo/1696831282/10002
                  Reduce Output Operator
                    sort order:
                    Map-reduce partition columns:
                          expr: rand()
                          type: double
                    tag: -1
                    value expressions:
                          expr: log_date
                          type: string
                          expr: log_time
                          type: string
                          expr: remote_ip
                          type: string
                          expr: remote_login
                          type: string
                          expr: server_name
                          type: string
                          expr: local_ip
                          type: string
                          expr: dash
                          type: string
                          expr: request_method
                          type: string
                          expr: request_url
                          type: string
                          expr: query_string
                          type: string
                          expr: http_status
                          type: int
                          expr: literal0
                          type: string
                          expr: bytes_sent
                          type: int
                          expr: cont_length
                          type: int
                          expr: time_to_serve
                          type: int
                          expr: user_agent
                          type: string
                          expr: cookie
                          type: string
                          expr: referer
                          type: string
            Reduce Operator Tree:
              Extract
                File Output Operator
                  compressed: false
                  GlobalTableId: 0
                  table:
                      input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                      name: raw_web_data_seq

  Stage: Stage-0
    Move Operator
      tables:
          partition:
            log_date_part 2009-07-05
          replace: true
          table:
              input format: org.apache.hadoop.mapred.SequenceFileInputFormat
              output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
              name: raw_web_data_seq



On Mon, Jul 6, 2009 at 2:01 PM, Namit Jain<nj...@facebook.com> wrote:
> hive.merge.mapfiles is set to true by default.
> So, in trunk, you should get small output files.
> Can you do a explain plan and send it if that is not the case ?
>
> -----Original Message-----
> From: Ashish Thusoo [mailto:athusoo@facebook.com]
> Sent: Monday, July 06, 2009 10:50 AM
> To: hive-user@hadoop.apache.org
> Subject: RE: Combine data for more throughput
>
> Namit recently added a facility to concatenate the files. The problem here is that the filter is running in the mapper.
>
> In trunk if you set
>
> set hive.merge.mapfiles=true
>
> That should do the trick
>
> In 0.3.0 you can send the output of the select to an Identity Reducer to get the same effect by using the REDUCE syntax..
>
> Ashish
> ________________________________________
> From: Edward Capriolo [edlinuxguru@gmail.com]
> Sent: Monday, July 06, 2009 9:47 AM
> To: hive-user@hadoop.apache.org
> Subject: Combine data for more throughput
>
> I am currently pulling our 5 minute logs into a Hive table. This
> results in a partition with ~4,000 tiny files in text format about 4MB
> per file, per day.
>
> I have created a table with an identical number of column  with
> 'STORED AS SEQUENCEFILE'. My goal is to use sequence file and merge
> the smaller files into larger files. This should put less stress on my
> name node and better performance. I am doing this:
>
> INSERT OVERWRITE TABLE raw_web_data_seq PARTITION (log_date_part='2009-07-05')
> select col1,col2...
> from raw_web_data  where log_date_part='2009-07-05';
>
> This does not do what I need as I end up with about 4000 'attempt'
> files like 'attempt_200905271425_1382_m_004318_0'.
> Does anyone have some tips on transforming raw data into the
> "fastest/best" possible format? Schema tips would be helpful, but I am
> really looking to merge up smaller files and chose a fast format, seq
> LZo whatever.
>
> Thanks
>

RE: Combined data more throughput

Posted by Namit Jain <nj...@facebook.com>.
 

The reason that we need a lot of mappers is because:

1) The input data size is very large.
2) The number of input files is very large.

 

In order to decrease the number of mappers, set mapred.min.split.size to a big number (like 1000000000 (1GB)).
The default value is 128MB. If you increase this, the number of mappers will automatically decrease.


Thanks,
-namit





-----Original Message-----
From: Edward Capriolo [mailto:edlinuxguru@gmail.com] 
Sent: Tuesday, July 07, 2009 8:15 AM
To: hive-user@hadoop.apache.org
Subject: Re: Combined data more throughput

I want to give a very theoretical, non technical hypothesis as to what
is happening here. I updated my cluster to use the trunk version. I
did confirm that the map side merging is working. I used diff the with
mapside merge true and false and saw the conditional task turning on
and off.

In my case the map-side-merging is "to little too late". When I took
Ashish's approach and used a REDUCE script the reduce tasks was not
progressing and seemed to time out. Now with the Map Side Merging the
Conditional Merge Task is timing out for the same reason. The Mapper
or reducer is dealing with the output of 4000 maps and the overhead is
timing the process out. I tried tuning "hive.merge.size.per.mapper" to
100,000,000 and 10,000,000 that did not seem to help.

I think the map side merging is probably great for keeping the 'small
files problem' from happening, but can not 'fix' it once it has
happened. Some point in the process gets hit with lots of inputs.

I am going to go to the source of the issue and fix the data ingestion
process. Right now, I drop a file per server per five minutes into a
hive partition. I can use the map phase to merge these files before
they go into the warehouse. Also I am thinking to introduce a second
partition based on hour. Each partition might not be too big
(600MB-1GB?), but the extra partitioning will make it easier to
operate on the data.

Re: Combined data more throughput

Posted by Edward Capriolo <ed...@gmail.com>.
I want to give a very theoretical, non technical hypothesis as to what
is happening here. I updated my cluster to use the trunk version. I
did confirm that the map side merging is working. I used diff the with
mapside merge true and false and saw the conditional task turning on
and off.

In my case the map-side-merging is "to little too late". When I took
Ashish's approach and used a REDUCE script the reduce tasks was not
progressing and seemed to time out. Now with the Map Side Merging the
Conditional Merge Task is timing out for the same reason. The Mapper
or reducer is dealing with the output of 4000 maps and the overhead is
timing the process out. I tried tuning "hive.merge.size.per.mapper" to
100,000,000 and 10,000,000 that did not seem to help.

I think the map side merging is probably great for keeping the 'small
files problem' from happening, but can not 'fix' it once it has
happened. Some point in the process gets hit with lots of inputs.

I am going to go to the source of the issue and fix the data ingestion
process. Right now, I drop a file per server per five minutes into a
hive partition. I can use the map phase to merge these files before
they go into the warehouse. Also I am thinking to introduce a second
partition based on hour. Each partition might not be too big
(600MB-1GB?), but the extra partitioning will make it easier to
operate on the data.