You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by Apache Wiki <wi...@apache.org> on 2010/12/01 01:38:02 UTC

[Hadoop Wiki] Update of "Hive/JoinOptimization" by JohnSichi

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hadoop Wiki" for change notification.

The "Hive/JoinOptimization" page has been changed by JohnSichi.
http://wiki.apache.org/hadoop/Hive/JoinOptimization?action=diff&rev1=10&rev2=11

--------------------------------------------------

  
  = 1. Map Join Optimization =
  == 1.1 Using Distributed Cache to Propagate Hashtable File ==
- Previously, when 2 large data tables need to do a join, there will be 2 different Mappers to sort these tables based on the join key and emit an intermediate file, and the Reducer will take the intermediate file as input file and do the real join work. This join mechanism is perfect with two large data size. But if one the join table is small enough to fit into the Mapper’s memory, then there is no need to launch the Reducer. Actually, the Reducer stage is very expensive for the performance because the Map/Reduce framework needs to sort and merge the intermediate files.
+ Previously, when 2 large data tables need to do a join, there will be 2 different Mappers to sort these tables based on the join key and emit an intermediate file, and the Reducer will take the intermediate file as input file and do the real join work. This join mechanism (referred to as Common Join) is perfect with two large data size. But if one the join tables is small enough to fit into the Mapper’s memory, then there is no need to launch the Reducer. Actually, the Reducer stage is very expensive for the performance because the Map/Reduce framework needs to sort and merge the intermediate files.  So the basic idea of Map Join is to hold the data of small table in Mapper’s memory and do the join work in Map stage, which saves the Reduce stage.  The previous implementation (before optimization) is shown here:
  
  {{attachment:fig1.jpg||height="763px",width="909px"}}
  
- '''Fig 1. The Previous Map Join'''
+ '''Fig 1. The Previous Map Join Implementation'''
  
- So the basic idea of Map Join is to hold the data of small table in Mapper’s memory and do the join work in Map stage, which saves the Reduce stage. As shown in Fig 1, the previous map join operation is not scale for large data because each Mapper will directly read the small table data from HDFS. If the large data file is large enough, there will be thousands of Mapper launched to read different record of this large data file. And thousands of Mapper will read this small table data from HDFS into the memory, which makes the small table to be the performance bottleneck, or sometimes Mapper will get lots of time-out for reading this small file, which may cause the task failed.
+ In Fig 1 above, the previous map join implementation does not scale well when the larger table is huge because each Mapper will directly read the small table data from HDFS. If the larger table is huge, there will be thousands of Mapper launched to read different records of the larger table. And those thousands of Mappers will read this small table data from HDFS into their memory, which can make access to the small table become the performance bottleneck; or, sometimes Mappers will get lots of time-outs for reading this small file, which may cause the task to fail.
+ 
+ 
+ Hive-1641 ([[http://issues.apache.org/jira/browse/HIVE-1293|http://issues.apache.org/jira/browse/HIVE-1641]]) has solved this problem, as shown in Fig2 below.
  
  {{attachment:fig2.jpg||height="881px",width="1184px"}}
  
  '''Fig 2. The Optimized Map Join'''
  
- Hive-1641 ([[http://issues.apache.org/jira/browse/HIVE-1293|http://issues.apache.org/jira/browse/HIVE-1641]]) has solved this problem. As shown in Fig2, the basic idea is to create a new task, MapReduce Local Task, before the original Join Map/Reduce Task. This new task will read the small table data from HDFS to in-memory hashtable. After reading, it will serialize the in-memory hashtable into files on disk and compress the hashtable file into a tar file. In next stage, when the MapReduce task is launching, it will put this tar file to Hadoop Distributed Cache, which will populate the tar file to each Mapper’s local disk and decompress the file. So all the Mappers can deserialize the hashtable file back into memory and do the join work as before.
+ The basic idea is to create a new task, MapReduce Local Task, before the original Join Map/Reduce Task. This new task will read the small table data from HDFS to in-memory hashtable. After reading, it will serialize the in-memory hashtable into files on disk and compress the hashtable file into a tar file. In next stage, when the MapReduce task is launching, it will put this tar file to Hadoop Distributed Cache, which will populate the tar file to each Mapper’s local disk and decompress the file. So all the Mappers can deserialize the hashtable file back into memory and do the join work as before.
  
  Obviously, the Local Task is a very memory intensive. So the query processor will launch this task in a child jvm, which has the same heap size as the Mapper's. Since the Local Task may run out of memory, the query processor will measure the memory usage of the local task very carefully. Once the memory usage of the Local Task is higher than a threshold number. This Local Task will abort itself and tells the user that this table is too large to hold in the memory. User can change this threshold by '''''set hive.mapjoin.localtask.max.memory.usage = 0.999;'''''
  
  == 1.2 Removing JDBM ==
- Previously, Hive uses JDBM ([[http://issues.apache.org/jira/browse/HIVE-1293|http://jdbm.sourceforge.net/]]) as a persistent hashtable. Whenever the in-memory hashtable cannot hold data any more, it will swap the key/value into the JDBM table. However when profiling the Map Join, we found out this JDBM component takes more than 70 % CPU time as shown in Fig3. Also the persistent file JDBM generated is too large to put into the Distributed Cache. For example, if users put 67,000 simple integer key/value pairs into the JDBM, it will generate more 22M hashtable file. So the JDBM is too heavy weight for Map Join and it would better to remove this component from Hive. Map Join is designed for holding the small table's data into memory. If the table is too large to hold, just run as a Common Join. There is no need to use persistent hashtable any more. Hive-1754 ([[http://issues.apache.org/jira/browse/HIVE-1293|http://issues.apache.org/jira/browse/HIVE-1754]])
+ Previously, Hive uses JDBM ([[http://issues.apache.org/jira/browse/HIVE-1293|http://jdbm.sourceforge.net/]]) as a persistent hashtable. Whenever the in-memory hashtable cannot hold data any more, it will swap the key/value into the JDBM table. However when profiling the Map Join, we found out this JDBM component takes more than 70 % CPU time as shown in Fig3. Also the persistent file JDBM generated is too large to put into the Distributed Cache. For example, if users put 67,000 simple integer key/value pairs into the JDBM, it will generate more 22M hashtable file. So the JDBM is too heavy weight for Map Join and it would better to remove this component from Hive. Map Join is designed for holding the small table's data into memory. If the table is too large to hold, just run as a Common Join. There is no need to use a persistent hashtable any more. Hive-1754 ([[http://issues.apache.org/jira/browse/HIVE-1293|http://issues.apache.org/jira/browse/HIVE-1754]])
  
  {{attachment:fig3.jpg}}
  
@@ -38, +41 @@

  
  = 2. Converting Join into Map Join Automatically =
  == 2.1 New Join Execution Flow ==
- Since map join is faster than the common join, it would better to run the map join whenever possible. Previously, Hive users need to give a hint in the query to assign which table the small table is. For example, '''''select /*+mapjoin(a)*/ * from src1 x  join src2y on x.key=y.key''''';   It is not a good way for user experience and query performance, because sometimes user may give a wrong hint and also users may not give any hints. It would be much better to convert the Common Join into Map Join without users' hint.
+ Since map join is faster than the common join, it would be better to run the map join whenever possible. Previously, Hive users need to give a hint in the query to assign which table the small table is. For example, '''''select /*+mapjoin(a)*/ * from src1 x  join src2y on x.key=y.key''''';   It is not a good way for user experience and query performance, because sometimes user may give a wrong hint and also users may not give any hints. It would be much better to convert the Common Join into Map Join without users' hint.
  
- Hive-1642 ([[http://issues.apache.org/jira/browse/HIVE-1293|http://issues.apache.org/jira/browse/HIVE-1642]]) has solved the problem by converting the Common Join into Map Join automatically. For the Map Join, the query processor should know which input table the big table is. Other input table will be recognize as the small table during the execution stage and these tables need to be hold in the memory. However, the query processor has no idea of input file size during compiling time. Because some of the table may be intermediate tables generated from sub queries. So the query processor can only figure out the input file size during the execution time.
+ Hive-1642 ([[http://issues.apache.org/jira/browse/HIVE-1293|http://issues.apache.org/jira/browse/HIVE-1642]]) has solved the problem by converting the Common Join into Map Join automatically. For the Map Join, the query processor should know which input table the big table is. The other input tables will be recognize as the small tables during the execution stage and these tables need to be held in the memory. However, in general, the query processor has no idea of input file size during compiling time (even with statistics) because some of the table may be intermediate tables generated from sub queries. So the query processor can only figure out the input file size during the execution time.
  
  Right now, users need to enable this feature by''''' set hive.auto.convert.join = true;'''''
  
@@ -48, +51 @@

  
  '''Fig 5: The Join Execution Flow'''
  
- As shown in fig5, the left side shows the previous Common Join execution flow, which very straightforward. On the other side, the right side is the new Common Join execution flow. During the compile time, the query processor will generate a Conditional Task, which contains a list of tasks and one of these tasks will be resolved to run during the execution time. It means the tasks in the Conditional Task's list are the candidates and one of them will be chosen to run during the run time. First, the original Common Join Task should be put into the list. Also the query processor will generate a series of Map Join Task by assuming each of the input tables may be the big table. Take the same example as before, '''''select /*+mapjoin(a)*/ * from src1 x  join src2y on x.key=y.key'''''. Both table '''''src2 '''''and '''''src1 '''''may be the big table, so it will generate 2 Map Join Task. One is assuming src1 is the big table and the other is assuming src2 is the big table, as shown in Fig 6.
+ As shown in fig5, the left side shows the previous Common Join execution flow, which is very straightforward. On the other side, the right side is the new Common Join execution flow. During the compile time, the query processor will generate a Conditional Task, which contains a list of tasks and one of these tasks will be resolved to run during the execution time. It means the tasks in the Conditional Task's list are the candidates and one of them will be chosen to run during the run time. First, the original Common Join Task should be put into the list. Also the query processor will generate a series of Map Join Task by assuming each of the input tables may be the big table. Take the same example as before, '''''select /*+mapjoin(a)*/ * from src1 x  join src2y on x.key=y.key'''''. Both table '''''src2 '''''and '''''src1 '''''may be the big table, so it will generate 2 Map Join Task. One is assuming src1 is the big table and the other is assuming src2 is the big table, as shown in Fig 6.
  
  {{attachment:fig6.jpg||height="778px",width="1072px"}}
  
  '''Fig 6: Create Map Join Task by Assuming One of the Input Table is the Big Table'''
  
  == 2.2 Resolving the Join Operation at Run Time ==
- During the execution stage, the Conditional Task can know exactly the  file size of each input table, even the table is a intermediate table.  If all the tables are too large to be converted into map join, then just  run the Common Join Task as previously. If one of the tables is large  and others are small enough to run Map Join, then the Conditional Task  will pick the corresponding Map Join Local Task to run. By this  mechanism, it can convert the Common Join into Map Join automatically  and dynamically.
+ During the execution stage, the Conditional Task can know exactly the file size of each input table, even the table is a intermediate table.  If all the tables are too large to be converted into map join, then just  run the Common Join Task as previously. If one of the tables is large  and others are small enough to run Map Join, then the Conditional Task  will pick the corresponding Map Join Local Task to run. By this  mechanism, it can convert the Common Join into Map Join automatically  and dynamically.
  
  Currently, if the total size of small tables are large than 25M, then the Conditional Task will choose the original Common Join run. 25M is a very conservative number and user can change this number by '''''set hive.smalltable.filesize = 30000000'''''.
  
  == 2.3 Backup Task ==
- As mentioned above,  the Local Task of Map Join is a very memory intensive. So the query  processor will launch this task in a child jvm, which has the same heap  size as the Mapper's. Since the Local Task may run out of memory, the  query processor will measure the memory usage of the local task very  carefully. Once the memory usage of the Local Task is higher than a  threshold. This Local Task will abort itself and it means the map join task fails. In this case, the query processor will launch the original Common Join  task as a Backup Task to run, which is totally transparent to user. The  basic idea is shown as Fig 7.
+ As mentioned above, the Local Task of Map Join is a very memory intensive. So the query processor will launch this task in a child jvm, which has the same heap size as the Mapper's. Since the Local Task may run out of memory, the  query processor will measure the memory usage of the local task very  carefully. Once the memory usage of the Local Task is higher than a  threshold. This Local Task will abort itself and it means the map join task fails. In this case, the query processor will launch the original Common Join  task as a Backup Task to run, which is totally transparent to user. The  basic idea is shown as Fig 7.
  
  {{attachment:fig7.jpg}}