You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Charles A <ca...@gmail.com> on 2009/09/29 16:38:23 UTC

Join failing on reduce step with OutOfMemoryError

Hi guys,
First of all: I'm new around here, so hello! Good work on Hive - it's a
great tool.

Now, to the question:

I am performing a join between 6 tables and keep experiencing OutOfMemoryErrors
on the reduce task at a certain point. Here is the part of the log:

2009-09-29 10:09:53,554 INFO ExecReducer:
<JOIN>Id =4
  <Children>
    <FS>Id =5
      <Parent>Id = 4 <\Parent>
    <\FS>
  <\Children>
<\JOIN>
2009-09-29 10:09:53,554 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: Initializing Self 4 JOIN
2009-09-29 10:09:53,555 INFO
org.apache.hadoop.hive.ql.exec.CommonJoinOperator: COMMONJOIN
struct<key:struct<joinkey0:int>,value:struct<_col0:int,_col2:int,_col5:string,_col13:string,_col35:string>,alias:tinyint>
2009-09-29 10:09:53,562 INFO
org.apache.hadoop.hive.ql.exec.CommonJoinOperator: JOIN
struct<_col0:int,_col2:int,_col5:string,_col13:string,_col35:string,_col52:string>
totalsz = 6
2009-09-29 10:09:53,562 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: Operator 4 JOIN
initialized
2009-09-29 10:09:53,562 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: Initializing children of
4 JOIN
2009-09-29 10:09:53,562 INFO
org.apache.hadoop.hive.ql.exec.FileSinkOperator: Initializing child 5
FS
2009-09-29 10:09:53,562 INFO
org.apache.hadoop.hive.ql.exec.FileSinkOperator: Initializing Self 5
FS
2009-09-29 10:09:53,563 INFO
org.apache.hadoop.hive.ql.exec.FileSinkOperator: Writing to temp file:
FS hdfs://h1:9000/tmp/hive-h1/601475416/_tmp.10003/_tmp.attempt_200909290917_0007_r_000003_1
2009-09-29 10:09:53,586 INFO
org.apache.hadoop.hive.ql.exec.FileSinkOperator: Operator 5 FS
initialized
2009-09-29 10:09:53,586 INFO
org.apache.hadoop.hive.ql.exec.FileSinkOperator: Initialization Done 5
FS
2009-09-29 10:09:53,586 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: Initialization Done 4
JOIN
2009-09-29 10:09:53,588 INFO ExecReducer: ExecReducer: processing 1
rows: used memory = 6227064
2009-09-29 10:09:53,589 INFO ExecReducer: ExecReducer: processing 10
rows: used memory = 6227064
2009-09-29 10:09:53,594 INFO ExecReducer: ExecReducer: processing 100
rows: used memory = 6227064
2009-09-29 10:09:53,637 INFO ExecReducer: ExecReducer: processing 1000
rows: used memory = 6601472
2009-09-29 10:09:53,638 WARN
org.apache.hadoop.hive.ql.exec.CommonJoinOperator: table 0 has 1000
rows for join key [355]
2009-09-29 10:09:53,832 INFO ExecReducer: ExecReducer: processing
10000 rows: used memory = 5574832
2009-09-29 10:09:54,277 INFO ExecReducer: ExecReducer: processing
100000 rows: used memory = 40277000
2009-09-29 10:10:02,907 INFO ExecReducer: ExecReducer: processing
1000000 rows: used memory = 371319768

The following is on stderr:

Exception in thread "Thread for syncLogs" java.lang.OutOfMemoryError:
Java heap space
	at java.io.BufferedOutputStream.<init>(BufferedOutputStream.java:76)

A slightly different error from another task:

2009-09-29 10:15:12,849 INFO org.apache.hadoop.mapred.TaskRunner:
Communication exception: java.lang.OutOfMemoryError: Java heap space
	at java.util.ArrayList.iterator(ArrayList.java:737)
	at org.apache.hadoop.mapred.Task.updateCounters(Task.java:612)
	at org.apache.hadoop.mapred.Task.access$300(Task.java:56)
	at org.apache.hadoop.mapred.Task$1.run(Task.java:413)
	at java.lang.Thread.run(Thread.java:636)


I have tried:
- Setting a bigger heap size -- mapred.child.java.opts=-Xmx2048m, etc... up
to 8g. The tasks fail the same way but will process more rows before
failing.
- Forcing it to use more reduce tasks -- mapred.reduce.tasks=16, etc.
- Rearranging the join order

None of these have fixed the problem.

Here is the basic query:

-------

INSERT OVERWRITE DIRECTORY '/user/hadoop/query-out'

SELECT
    P.created_at,
    A.name,
    P.job_id,
    LV.value,
    PR.zip as pr_zip,
    L.zip as l_zip
FROM
    L
    join LV on L.id = LV.L_id and LV.attribute_id = 352
    join P on L.id = P.L_id
    join JA on JA.job_id = P.job_id
    join A on A.id = JA.A_id
    left outer join PR on PR.P_id = P.id

-------

For reference, number of records per table:

L: 2.5 million
P: 55 million
JA: 40
A: 15
PR: 10 million
LV: 25 million

I think it is failing on the join between JA and A, given that the errors
tend to appear after it logs information about a key that is a job_id  (e.g.
"table 0 has 1000 rows for join key [355]") .

I am using Hive trunk (r819913) and Hadoop 0.19.2.

Any help is greatly appreciated.

Many thanks,
- Charles

Re: Join failing on reduce step with OutOfMemoryError

Posted by Abhijit Pol <ap...@rocketfuelinc.com>.
Small clarification to Jason's apt comment.

I think its all values for a given join key from LHS get cached and all
values for a given join key from right most table are streamed. Basically
this is how reducer works. You get an iterator with all values from all
tables for a given join key. You then iterate over values and cache them all
except for the last table. Continue iterating over values from last table
and flush the join results.

So you want to keep the table with most number of values for join key right
most (generally a fact table).

Please correct me if I got it wrong :-)

Thanks,
Abhi


On Tue, Sep 29, 2009 at 7:51 AM, Jason Michael <jm...@videoegg.com>wrote:

>  Hi Charles,
>
> One thing to keep in mind is that you should always keep the larger tables
> on the right hand side of the join.  It is my understanding that the LHS of
> a join gets put into memory and the RHS passed through the reducers.  If
> your LHS is a large table, you will run into memory problems.  In a complex
> query such as yours with multiple joins, you may need to make use of
> subqueries in order to stick with this rule.
>
> Hope that helps,
> Jason
>
>
>
> On 9/29/09 7:38 AM, "Charles A" <ca...@gmail.com> wrote:
>
> Hi guys,
>
> First of all: I'm new around here, so hello! Good work on Hive - it's a
> great tool.
>
> Now, to the question:
>
> I am performing a join between 6 tables and keep experiencing OutOfMemoryErrors
> on the reduce task at a certain point. Here is the part of the log:
> 2009-09-29 10:09:53,554 INFO ExecReducer:
> <JOIN>Id =4
>   <Children>
>     <FS>Id =5
>       <Parent>Id = 4 <\Parent>
>     <\FS>
>   <\Children>
> <\JOIN>
> 2009-09-29 10:09:53,554 INFO org.apache.hadoop.hive.ql.exec.JoinOperator:
> Initializing Self 4 JOIN
> 2009-09-29 10:09:53,555 INFO
> org.apache.hadoop.hive.ql.exec.CommonJoinOperator: COMMONJOIN
> struct<key:struct<joinkey0:int>,value:struct<_col0:int,_col2:int,_col5:string,_col13:string,_col35:string>,alias:tinyint>
> 2009-09-29 10:09:53,562 INFO
> org.apache.hadoop.hive.ql.exec.CommonJoinOperator: JOIN
> struct<_col0:int,_col2:int,_col5:string,_col13:string,_col35:string,_col52:string>
> totalsz = 6
> 2009-09-29 10:09:53,562 INFO org.apache.hadoop.hive.ql.exec.JoinOperator:
> Operator 4 JOIN initialized
> 2009-09-29 10:09:53,562 INFO org.apache.hadoop.hive.ql.exec.JoinOperator:
> Initializing children of 4 JOIN
> 2009-09-29 10:09:53,562 INFO
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: Initializing child 5 FS
> 2009-09-29 10:09:53,562 INFO
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: Initializing Self 5 FS
> 2009-09-29 10:09:53,563 INFO
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: Writing to temp file: FS
> hdfs://h1:9000/tmp/hive-h1/601475416/_tmp.10003/_tmp.attempt_200909290917_0007_r_000003_1
> 2009-09-29 10:09:53,586 INFO
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: Operator 5 FS initialized
> 2009-09-29 10:09:53,586 INFO
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: Initialization Done 5 FS
> 2009-09-29 10:09:53,586 INFO org.apache.hadoop.hive.ql.exec.JoinOperator:
> Initialization Done 4 JOIN
> 2009-09-29 10:09:53,588 INFO ExecReducer: ExecReducer: processing 1 rows:
> used memory = 6227064
> 2009-09-29 10:09:53,589 INFO ExecReducer: ExecReducer: processing 10 rows:
> used memory = 6227064
> 2009-09-29 10:09:53,594 INFO ExecReducer: ExecReducer: processing 100 rows:
> used memory = 6227064
> 2009-09-29 10:09:53,637 INFO ExecReducer: ExecReducer: processing 1000
> rows: used memory = 6601472
> 2009-09-29 10:09:53,638 WARN
> org.apache.hadoop.hive.ql.exec.CommonJoinOperator: table 0 has 1000 rows for
> join key [355]
> 2009-09-29 10:09:53,832 INFO ExecReducer: ExecReducer: processing 10000
> rows: used memory = 5574832
> 2009-09-29 10:09:54,277 INFO ExecReducer: ExecReducer: processing 100000
> rows: used memory = 40277000
> 2009-09-29 10:10:02,907 INFO ExecReducer: ExecReducer: processing 1000000
> rows: used memory = 371319768
> The following is on stderr:
> Exception in thread "Thread for syncLogs" java.lang.OutOfMemoryError: Java
> heap space
>  at java.io.BufferedOutputStream.<init>(BufferedOutputStream.java:76)
> A slightly different error from another task:
> 2009-09-29 10:15:12,849 INFO org.apache.hadoop.mapred.TaskRunner:
> Communication exception: java.lang.OutOfMemoryError: Java heap space
>  at java.util.ArrayList.iterator(ArrayList.java:737)
>  at org.apache.hadoop.mapred.Task.updateCounters(Task.java:612)
>  at org.apache.hadoop.mapred.Task.access$300(Task.java:56)
>  at org.apache.hadoop.mapred.Task$1.run(Task.java:413)
>  at java.lang.Thread.run(Thread.java:636)
>
> I have tried:
> - Setting a bigger heap size -- mapred.child.java.opts=-Xmx2048m, etc... up
> to 8g. The tasks fail the same way but will process more rows before
> failing.
> - Forcing it to use more reduce tasks -- mapred.reduce.tasks=16, etc.
> - Rearranging the join order
>
> None of these have fixed the problem.
>
> Here is the basic query:
>
> -------
>
> INSERT OVERWRITE DIRECTORY '/user/hadoop/query-out'
>
> SELECT
>     P.created_at,
>     A.name,
>     P.job_id,
>     LV.value,
>     PR.zip as pr_zip,
>     L.zip as l_zip
> FROM
>     L
>     join LV on L.id = LV.L_id and LV.attribute_id = 352
>     join P on L.id = P.L_id
>     join JA on JA.job_id = P.job_id
>     join A on A.id = JA.A_id
>     left outer join PR on PR.P_id = P.id
>
> -------
>
> For reference, number of records per table:
>
> L: 2.5 million
> P: 55 million
> JA: 40
> A: 15
> PR: 10 million
> LV: 25 million
>
> I think it is failing on the join between JA and A, given that the errors
> tend to appear after it logs information about a key that is a job_id  (e.g.
> "table 0 has 1000 rows for join key [355]") .
>
> I am using Hive trunk (r819913) and Hadoop 0.19.2.
>
> Any help is greatly appreciated.
>
> Many thanks,
> - Charles
>
>

Re: Join failing on reduce step with OutOfMemoryError

Posted by Jason Michael <jm...@videoegg.com>.
Hi Charles,

One thing to keep in mind is that you should always keep the larger tables on the right hand side of the join.  It is my understanding that the LHS of a join gets put into memory and the RHS passed through the reducers.  If your LHS is a large table, you will run into memory problems.  In a complex query such as yours with multiple joins, you may need to make use of subqueries in order to stick with this rule.

Hope that helps,
Jason


On 9/29/09 7:38 AM, "Charles A" <ca...@gmail.com> wrote:

Hi guys,

First of all: I'm new around here, so hello! Good work on Hive - it's a great tool.

Now, to the question:

I am performing a join between 6 tables and keep experiencing OutOfMemoryErrors on the reduce task at a certain point. Here is the part of the log:
2009-09-29 10:09:53,554 INFO ExecReducer:
<JOIN>Id =4
  <Children>
    <FS>Id =5
      <Parent>Id = 4 <\Parent>
    <\FS>
  <\Children>
<\JOIN>
2009-09-29 10:09:53,554 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: Initializing Self 4 JOIN
2009-09-29 10:09:53,555 INFO org.apache.hadoop.hive.ql.exec.CommonJoinOperator: COMMONJOIN struct<key:struct<joinkey0:int>,value:struct<_col0:int,_col2:int,_col5:string,_col13:string,_col35:string>,alias:tinyint>
2009-09-29 10:09:53,562 INFO org.apache.hadoop.hive.ql.exec.CommonJoinOperator: JOIN struct<_col0:int,_col2:int,_col5:string,_col13:string,_col35:string,_col52:string> totalsz = 6
2009-09-29 10:09:53,562 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: Operator 4 JOIN initialized
2009-09-29 10:09:53,562 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: Initializing children of 4 JOIN
2009-09-29 10:09:53,562 INFO org.apache.hadoop.hive.ql.exec.FileSinkOperator: Initializing child 5 FS
2009-09-29 10:09:53,562 INFO org.apache.hadoop.hive.ql.exec.FileSinkOperator: Initializing Self 5 FS
2009-09-29 10:09:53,563 INFO org.apache.hadoop.hive.ql.exec.FileSinkOperator: Writing to temp file: FS hdfs://h1:9000/tmp/hive-h1/601475416/_tmp.10003/_tmp.attempt_200909290917_0007_r_000003_1
2009-09-29 10:09:53,586 INFO org.apache.hadoop.hive.ql.exec.FileSinkOperator: Operator 5 FS initialized
2009-09-29 10:09:53,586 INFO org.apache.hadoop.hive.ql.exec.FileSinkOperator: Initialization Done 5 FS
2009-09-29 10:09:53,586 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: Initialization Done 4 JOIN
2009-09-29 10:09:53,588 INFO ExecReducer: ExecReducer: processing 1 rows: used memory = 6227064
2009-09-29 10:09:53,589 INFO ExecReducer: ExecReducer: processing 10 rows: used memory = 6227064
2009-09-29 10:09:53,594 INFO ExecReducer: ExecReducer: processing 100 rows: used memory = 6227064
2009-09-29 10:09:53,637 INFO ExecReducer: ExecReducer: processing 1000 rows: used memory = 6601472
2009-09-29 10:09:53,638 WARN org.apache.hadoop.hive.ql.exec.CommonJoinOperator: table 0 has 1000 rows for join key [355]
2009-09-29 10:09:53,832 INFO ExecReducer: ExecReducer: processing 10000 rows: used memory = 5574832
2009-09-29 10:09:54,277 INFO ExecReducer: ExecReducer: processing 100000 rows: used memory = 40277000
2009-09-29 10:10:02,907 INFO ExecReducer: ExecReducer: processing 1000000 rows: used memory = 371319768
The following is on stderr:
Exception in thread "Thread for syncLogs" java.lang.OutOfMemoryError: Java heap space
 at java.io.BufferedOutputStream.<init>(BufferedOutputStream.java:76)
A slightly different error from another task:
2009-09-29 10:15:12,849 INFO org.apache.hadoop.mapred.TaskRunner: Communication exception: java.lang.OutOfMemoryError: Java heap space
 at java.util.ArrayList.iterator(ArrayList.java:737)
 at org.apache.hadoop.mapred.Task.updateCounters(Task.java:612)
 at org.apache.hadoop.mapred.Task.access$300(Task.java:56)
 at org.apache.hadoop.mapred.Task$1.run(Task.java:413)
 at java.lang.Thread.run(Thread.java:636)

I have tried:
- Setting a bigger heap size -- mapred.child.java.opts=-Xmx2048m, etc... up to 8g. The tasks fail the same way but will process more rows before failing.
- Forcing it to use more reduce tasks -- mapred.reduce.tasks=16, etc.
- Rearranging the join order

None of these have fixed the problem.

Here is the basic query:

-------

INSERT OVERWRITE DIRECTORY '/user/hadoop/query-out'

SELECT
    P.created_at,
    A.name,
    P.job_id,
    LV.value,
    PR.zip as pr_zip,
    L.zip as l_zip
FROM
    L
    join LV on L.id = LV.L_id and LV.attribute_id = 352
    join P on L.id = P.L_id
    join JA on JA.job_id = P.job_id
    join A on A.id = JA.A_id
    left outer join PR on PR.P_id = P.id

-------

For reference, number of records per table:

L: 2.5 million
P: 55 million
JA: 40
A: 15
PR: 10 million
LV: 25 million

I think it is failing on the join between JA and A, given that the errors tend to appear after it logs information about a key that is a job_id  (e.g. "table 0 has 1000 rows for join key [355]") .

I am using Hive trunk (r819913) and Hadoop 0.19.2.

Any help is greatly appreciated.

Many thanks,
- Charles