You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/01/28 18:33:15 UTC

svn commit: r904179 - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/ ql/src/java/org/apache/hadoop/hive/ql/plan/

Author: namit
Date: Thu Jan 28 17:33:14 2010
New Revision: 904179

URL: http://svn.apache.org/viewvc?rev=904179&view=rev
Log:
HIVE-1093. Add a "skew join map join size" variable to control the
input size of skew join's following map join job
(He Yongqiang via namit)


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/conf/hive-default.xml
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=904179&r1=904178&r2=904179&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Jan 28 17:33:14 2010
@@ -85,6 +85,10 @@
     HIVE-1108. Make QueryPlan serializable
     (Zheng Shao via namit)
 
+    HIVE-1093. Add a "skew join map join size" variable to control the
+    input size of skew join's following map join job
+    (He Yongqiang via namit)
+
 Release 0.5.0 -  Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=904179&r1=904178&r2=904179&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Jan 28 17:33:14 2010
@@ -171,7 +171,10 @@
     HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long)(16*1000*1000)),
 
     HIVESKEWJOIN("hive.optimize.skewjoin", false),
-    HIVESKEWJOINKEY("hive.skewjoin.key", 500000),
+    HIVESKEWJOINKEY("hive.skewjoin.key", 1000000),
+    HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000),
+    HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432), //32M
+    MAPREDMINSPLITSIZE("mapred.min.split.size", 1),
     
     HIVESENDHEARTBEAT("hive.heartbeat.interval", 1000),
     HIVEMAXMAPJOINSIZE("hive.mapjoin.maxsize", 100000),

Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=904179&r1=904178&r2=904179&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Thu Jan 28 17:33:14 2010
@@ -294,6 +294,22 @@
 </property>
 
 <property>
+  <name>hive.skewjoin.mapjoin.map.tasks</name>
+  <value>10000</value>
+  <description> Determine the number of map task used in the follow up map join job 
+	for a skew join. It should be used together with hive.skewjoin.mapjoin.min.split 
+	to perform a fine grained control.</description>
+</property>
+
+<property>
+  <name>hive.skewjoin.mapjoin.min.split</name>
+  <value>33554432</value>
+  <description> Determine the number of map task at most used in the follow up map join job 
+	for a skew join by specifying the minimum split size. It should be used together with 
+	hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.</description>
+</property>
+
+<property>
   <name>hive.mapred.mode</name>
   <value>nonstrict</value>
   <description>The mode in which the hive operations are being performed. In strict mode, some risky queries are not allowed to run</description>

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=904179&r1=904178&r2=904179&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Jan 28 17:33:14 2010
@@ -551,9 +551,19 @@
 
     job.setMapOutputKeyClass(HiveKey.class);
     job.setMapOutputValueClass(BytesWritable.class);
-
+    if(work.getNumMapTasks() != null) {
+      job.setNumMapTasks(work.getNumMapTasks().intValue());      
+    }
+    if(work.getMinSplitSize() != null) {
+      job.setInt(HiveConf.ConfVars.MAPREDMINSPLITSIZE.varname, 
+         work.getMinSplitSize().intValue());
+    }
     job.setNumReduceTasks(work.getNumReduceTasks().intValue());
     job.setReducerClass(ExecReducer.class);
+    
+    if(work.getInputformat() != null) {
+      HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, work.getInputformat());
+    }
 
     // Turn on speculative execution for reducers
     HiveConf.setVar(job, HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS,

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=904179&r1=904178&r2=904179&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java Thu Jan 28 17:33:14 2010
@@ -40,6 +40,7 @@
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin;
@@ -91,11 +92,11 @@
    * For each table, we launch one mapjoin job, taking the directory containing
    * big keys in this table and corresponding dirs in other tables as input.
    * (Actally one job for one row in the above.)
-   * 
+   *
    * <p>
    * For more discussions, please check
    * https://issues.apache.org/jira/browse/HIVE-964.
-   * 
+   *
    */
   public static void processSkewJoin(JoinOperator joinOp,
       Task<? extends Serializable> currTask, ParseContext parseCtx)
@@ -152,14 +153,10 @@
     Map<Byte, TableDesc> tableDescList = new HashMap<Byte, TableDesc>();
     Map<Byte, List<ExprNodeDesc>> newJoinValues = new HashMap<Byte, List<ExprNodeDesc>>();
     Map<Byte, List<ExprNodeDesc>> newJoinKeys = new HashMap<Byte, List<ExprNodeDesc>>();
-    List<TableDesc> newJoinValueTblDesc = new ArrayList<TableDesc>();// used for
-                                                                     // create
-                                                                     // mapJoinDesc,
-                                                                     // should
-                                                                     // be in
-                                                                     // order
+    // used for create mapJoinDesc, should be in order
+    List<TableDesc> newJoinValueTblDesc = new ArrayList<TableDesc>();
 
-    for (Byte tag : tags) {
+    for (int k = 0; k < tags.length; k++) {
       newJoinValueTblDesc.add(null);
     }
 
@@ -302,9 +299,10 @@
 
       HiveConf jc = new HiveConf(parseCtx.getConf(),
           GenMRSkewJoinProcessor.class);
-      HiveConf.setVar(jc, HiveConf.ConfVars.HIVEINPUTFORMAT,
-          org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.class
-              .getCanonicalName());
+
+      newPlan.setNumMapTasks(HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINNUMMAPTASK));
+      newPlan.setMinSplitSize(HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINMINSPLIT));
+      newPlan.setInputformat(HiveInputFormat.class.getName());
       Task<? extends Serializable> skewJoinMapJoinTask = TaskFactory.get(
           newPlan, jc);
       bigKeysDirToTaskMap.put(bigKeyDirPath, skewJoinMapJoinTask);
@@ -377,4 +375,4 @@
         + UNDERLINE + srcTblBigTbl + UNDERLINE + srcTblSmallTbl;
   }
 
-}
\ No newline at end of file
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=904179&r1=904178&r2=904179&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Thu Jan 28 17:33:14 2010
@@ -53,9 +53,12 @@
   private Operator<?> reducer;
 
   private Integer numReduceTasks;
+  private Integer numMapTasks;
+  private Integer minSplitSize;
 
   private boolean needsTagging;
   private MapredLocalWork mapLocalWork;
+  private String inputformat;
 
   public MapredWork() {
     aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>();
@@ -175,7 +178,16 @@
   public void setReducer(final Operator<?> reducer) {
     this.reducer = reducer;
   }
+  
 
+  public Integer getNumMapTasks() {
+    return numMapTasks;
+  }
+
+  public void setNumMapTasks(Integer numMapTasks) {
+    this.numMapTasks = numMapTasks;
+  }
+  
   /**
    * If the number of reducers is -1, the runtime will automatically figure it
    * out by input data size.
@@ -277,4 +289,20 @@
     this.needsTagging = needsTagging;
   }
 
+  public Integer getMinSplitSize() {
+    return minSplitSize;
+  }
+
+  public void setMinSplitSize(Integer minSplitSize) {
+    this.minSplitSize = minSplitSize;
+  }
+
+  public String getInputformat() {
+    return inputformat;
+  }
+
+  public void setInputformat(String inputformat) {
+    this.inputformat = inputformat;
+  }
+
 }