You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/24 22:17:33 UTC

svn commit: r1471634 [1/2] - in /hive/branches/branch-0.11/ql/src: java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java test/queries/clientpositive/join32_lessSize.q test/results/clientpositive/join32_lessSize.q.out

Author: hashutosh
Date: Wed Apr 24 20:17:33 2013
New Revision: 1471634

URL: http://svn.apache.org/r1471634
Log:
HIVE-3996 : Correctly enforce the memory limit on the multi-table map-join (Vikram Dixit via Ashutosh Chauhan)

Added:
    hive/branches/branch-0.11/ql/src/test/queries/clientpositive/join32_lessSize.q
    hive/branches/branch-0.11/ql/src/test/results/clientpositive/join32_lessSize.q.out
Modified:
    hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java

Modified: hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java?rev=1471634&r1=1471633&r2=1471634&view=diff
==============================================================================
--- hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (original)
+++ hive/branches/branch-0.11/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java Wed Apr 24 20:17:33 2013
@@ -120,6 +120,8 @@ public class CommonJoinResolver implemen
    */
   class CommonJoinTaskDispatcher implements Dispatcher {
 
+    HashMap<String, Long> aliasToSize = null;
+
     private final PhysicalContext physicalContext;
 
     public CommonJoinTaskDispatcher(PhysicalContext context) {
@@ -145,7 +147,7 @@ public class CommonJoinResolver implemen
      * A task and its child task has been converted from join to mapjoin.
      * See if the two tasks can be merged.
      */
-    private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task) {
+    private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task, Configuration conf) {
       MapRedTask childTask = (MapRedTask)task.getChildTasks().get(0);
       MapredWork work = task.getWork();
       MapredLocalWork localWork = work.getMapLocalWork();
@@ -194,6 +196,33 @@ public class CommonJoinResolver implemen
       if (childWork.getAliasToWork().size() > 1) {
         return;
       }
+      long mapJoinSize = HiveConf.getLongVar(conf,
+          HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
+      long localTableTotalSize = 0;
+      for (String alias : localWork.getAliasToWork().keySet()) {
+        Long tabSize = aliasToSize.get(alias);
+        if (tabSize == null) {
+          /* if the size is unavailable, we need to assume a size 1 greater than mapJoinSize
+           * this implies that merge cannot happen so we can return.
+           */
+          return;
+        }
+        localTableTotalSize += tabSize;
+      }
+
+      for (String alias : childLocalWork.getAliasToWork().keySet()) {
+        Long tabSize = aliasToSize.get(alias);
+        if (tabSize == null) {
+          /* if the size is unavailable, we need to assume a size 1 greater than mapJoinSize
+           * this implies that merge cannot happen so we can return.
+           */
+          return;
+        }
+        localTableTotalSize += tabSize;
+        if (localTableTotalSize > mapJoinSize) {
+          return;
+        }
+      }
 
       Operator<? extends Serializable> childAliasOp =
           childWork.getAliasToWork().values().iterator().next();
@@ -279,7 +308,10 @@ public class CommonJoinResolver implemen
       int numAliases = order.length;
 
       long aliasTotalKnownInputSize = 0;
-      HashMap<String, Long> aliasToSize = new HashMap<String, Long>();
+
+      if (aliasToSize == null) {
+        aliasToSize = new HashMap<String, Long>();
+      }
       try {
         // go over all the input paths, and calculate a known total size, known
         // size for each input alias.
@@ -380,7 +412,7 @@ public class CommonJoinResolver implemen
           // followed by a mapjoin can be performed in a single MR job.
           if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size() == 1)
               && (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP)) {
-            mergeMapJoinTaskWithChildMapJoinTask(newTask);
+            mergeMapJoinTaskWithChildMapJoinTask(newTask, conf);
           }
 
           return newTask;

Added: hive/branches/branch-0.11/ql/src/test/queries/clientpositive/join32_lessSize.q
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.11/ql/src/test/queries/clientpositive/join32_lessSize.q?rev=1471634&view=auto
==============================================================================
--- hive/branches/branch-0.11/ql/src/test/queries/clientpositive/join32_lessSize.q (added)
+++ hive/branches/branch-0.11/ql/src/test/queries/clientpositive/join32_lessSize.q Wed Apr 24 20:17:33 2013
@@ -0,0 +1,66 @@
+CREATE TABLE dest_j1(key STRING, value STRING, val2 STRING) STORED AS TEXTFILE;
+CREATE TABLE dest_j2(key STRING, value STRING, val2 STRING) STORED AS TEXTFILE;
+
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=6000;
+
+-- Since the inputs are small, it should be automatically converted to mapjoin
+
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE dest_j1
+SELECT x.key, z.value, y.value
+FROM src1 x JOIN src y ON (x.key = y.key) 
+JOIN srcpart z ON (x.value = z.value and z.ds='2008-04-08' and z.hr=11);
+
+INSERT OVERWRITE TABLE dest_j1
+SELECT x.key, z.value, y.value
+FROM src1 x JOIN src y ON (x.key = y.key) 
+JOIN srcpart z ON (x.value = z.value and z.ds='2008-04-08' and z.hr=11);
+
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE dest_j1
+SELECT x.key, z.value, y.value
+FROM src w JOIN src1 x ON (x.value = w.value) 
+JOIN src y ON (x.key = y.key) 
+JOIN src1 z ON (x.key = z.key);
+
+select * from dest_j1 x order by x.key;
+
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE dest_j2
+SELECT res.key, z.value, res.value
+FROM (select x.key, x.value from src1 x JOIN src y ON (x.key = y.key)) res 
+JOIN srcpart z ON (res.value = z.value and z.ds='2008-04-08' and z.hr=11);
+
+INSERT OVERWRITE TABLE dest_j2
+SELECT res.key, z.value, res.value
+FROM (select x.key, x.value from src1 x JOIN src y ON (x.key = y.key)) res 
+JOIN srcpart z ON (res.value = z.value and z.ds='2008-04-08' and z.hr=11);
+
+select * from dest_j2 x order by x.key;
+
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE dest_j2
+SELECT res.key, z.value, res.value
+FROM (select x.key, x.value from src1 x LEFT OUTER JOIN src y ON (x.key = y.key)) res 
+JOIN srcpart z ON (res.value = z.value and z.ds='2008-04-08' and z.hr=11);
+
+INSERT OVERWRITE TABLE dest_j2
+SELECT res.key, z.value, res.value
+FROM (select x.key, x.value from src1 x LEFT OUTER JOIN src y ON (x.key = y.key)) res 
+JOIN srcpart z ON (res.value = z.value and z.ds='2008-04-08' and z.hr=11);
+
+select * from dest_j2 x order by x.key;
+
+EXPLAIN
+INSERT OVERWRITE TABLE dest_j2
+SELECT res.key, x.value, res.value  
+FROM (select x.key, x.value from src1 x JOIN src y ON (x.key = y.key)) res 
+JOIN srcpart x ON (res.value = x.value and x.ds='2008-04-08' and x.hr=11);
+
+EXPLAIN
+INSERT OVERWRITE TABLE dest_j2
+SELECT res.key, y.value, res.value
+FROM (select x.key, x.value from src1 x JOIN src y ON (x.key = y.key)) res 
+JOIN srcpart y ON (res.value = y.value and y.ds='2008-04-08' and y.hr=11);