You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by js...@apache.org on 2011/03/10 00:10:07 UTC

svn commit: r1080053 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/plan/

Author: jssarma
Date: Wed Mar  9 23:10:07 2011
New Revision: 1080053

URL: http://svn.apache.org/viewvc?rev=1080053&view=rev
Log:
HIVE-2037. Merge result file size should honor hive.merge.size.per.task (Ning Zhang via jssarma)

Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1080053&r1=1080052&r2=1080053&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Mar  9 23:10:07 2011
@@ -279,7 +279,10 @@ public class HiveConf extends Configurat
     HIVESKEWJOINKEY("hive.skewjoin.key", 1000000),
     HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000),
     HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432L), //32M
+    MAPREDMAXSPLITSIZE("mapred.max.split.size", 256000000L),
     MAPREDMINSPLITSIZE("mapred.min.split.size", 1L),
+    MAPREDMINSPLITSIZEPERNODE("mapred.min.split.size.per.rack", 1L),
+    MAPREDMINSPLITSIZEPERRACK("mapred.min.split.size.per.node", 1L),
     HIVEMERGEMAPONLY("hive.mergejob.maponly", true),
 
     HIVESENDHEARTBEAT("hive.heartbeat.interval", 1000),

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1080053&r1=1080052&r2=1080053&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Mar  9 23:10:07 2011
@@ -77,7 +77,6 @@ import org.apache.hadoop.mapred.JobClien
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskReport;
 import org.apache.log4j.Appender;
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.FileAppender;
@@ -254,9 +253,23 @@ public class ExecDriver extends Task<Map
     if (work.getNumMapTasks() != null) {
       job.setNumMapTasks(work.getNumMapTasks().intValue());
     }
+
+    if (work.getMaxSplitSize() != null) {
+      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, work.getMaxSplitSize().longValue());
+    }
+
     if (work.getMinSplitSize() != null) {
       HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work.getMinSplitSize().longValue());
     }
+
+    if (work.getMinSplitSizePerNode() != null) {
+      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, work.getMinSplitSizePerNode().longValue());
+    }
+
+    if (work.getMinSplitSizePerRack() != null) {
+      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, work.getMinSplitSizePerRack().longValue());
+    }
+
     job.setNumReduceTasks(work.getNumReduceTasks().intValue());
     job.setReducerClass(ExecReducer.class);
 
@@ -393,7 +406,7 @@ public class ExecDriver extends Task<Map
       if (pwd != null) {
         HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd);
       }
-      
+
       returnVal = jobExecHelper.progress(rj, jc);
       success = (returnVal == 0);
     } catch (Exception e) {
@@ -453,7 +466,7 @@ public class ExecDriver extends Task<Map
 
     return (returnVal);
   }
-  
+
   public boolean mapStarted() {
     return this.jobExecHelper.mapStarted();
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java?rev=1080053&r1=1080052&r2=1080053&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java Wed Mar  9 23:10:07 2011
@@ -117,7 +117,7 @@ public class ConditionalResolverMergeFil
     long trgtSize = conf.getLongVar(HiveConf.ConfVars.HIVEMERGEMAPFILESSIZE);
     long avgConditionSize = conf
         .getLongVar(HiveConf.ConfVars.HIVEMERGEMAPFILESAVGSIZE);
-    trgtSize = trgtSize > avgConditionSize ? trgtSize : avgConditionSize;
+    trgtSize = Math.max(trgtSize, avgConditionSize);
 
     Task<? extends Serializable> mvTask = ctx.getListTasks().get(0);
     Task<? extends Serializable> mrTask = ctx.getListTasks().get(1);
@@ -255,8 +255,12 @@ public class ConditionalResolverMergeFil
       reducers = Math.min(maxReducers, reducers);
       work.setNumReduceTasks(reducers);
     }
+    work.setMaxSplitSize(targetSize);
     work.setMinSplitSize(targetSize);
+    work.setMinSplitSizePerNode(targetSize);
+    work.setMinSplitSizePerRack(targetSize);
   }
+
   /**
    * Whether to merge files inside directory given the threshold of the average file size.
    *

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1080053&r1=1080052&r2=1080053&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Wed Mar  9 23:10:07 2011
@@ -29,9 +29,6 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.QBJoinTree;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 
 /**
  * MapredWork.
@@ -63,7 +60,10 @@ public class MapredWork implements Seria
 
   private Integer numReduceTasks;
   private Integer numMapTasks;
+  private Long maxSplitSize;
   private Long minSplitSize;
+  private Long minSplitSizePerNode;
+  private Long minSplitSizePerRack;
 
   private boolean needsTagging;
   private boolean hadoopSupportsSplittable;
@@ -104,6 +104,10 @@ public class MapredWork implements Seria
     this.mapLocalWork = mapLocalWork;
     aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>();
     this.hadoopSupportsSplittable = hadoopSupportsSplittable;
+    maxSplitSize = null;
+    minSplitSize = null;
+    minSplitSizePerNode = null;
+    minSplitSizePerRack = null;
   }
 
   public String getCommand() {
@@ -323,6 +327,14 @@ public class MapredWork implements Seria
     this.hadoopSupportsSplittable = hadoopSupportsSplittable;
   }
 
+  public Long getMaxSplitSize() {
+    return maxSplitSize;
+  }
+
+  public void setMaxSplitSize(Long maxSplitSize) {
+    this.maxSplitSize = maxSplitSize;
+  }
+
   public Long getMinSplitSize() {
     return minSplitSize;
   }
@@ -331,6 +343,22 @@ public class MapredWork implements Seria
     this.minSplitSize = minSplitSize;
   }
 
+  public Long getMinSplitSizePerNode() {
+    return minSplitSizePerNode;
+  }
+
+  public void setMinSplitSizePerNode(Long minSplitSizePerNode) {
+    this.minSplitSizePerNode = minSplitSizePerNode;
+  }
+
+  public Long getMinSplitSizePerRack() {
+    return minSplitSizePerRack;
+  }
+
+  public void setMinSplitSizePerRack(Long minSplitSizePerRack) {
+    this.minSplitSizePerRack = minSplitSizePerRack;
+  }
+
   public String getInputformat() {
     return inputformat;
   }