You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by js...@apache.org on 2011/03/10 00:10:07 UTC
svn commit: r1080053 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/plan/
Author: jssarma
Date: Wed Mar 9 23:10:07 2011
New Revision: 1080053
URL: http://svn.apache.org/viewvc?rev=1080053&view=rev
Log:
HIVE-2037. Merge result file size should honor hive.merge.size.per.task (Ning Zhang via jssarma)
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1080053&r1=1080052&r2=1080053&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Mar 9 23:10:07 2011
@@ -279,7 +279,10 @@ public class HiveConf extends Configurat
HIVESKEWJOINKEY("hive.skewjoin.key", 1000000),
HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000),
HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432L), //32M
+ MAPREDMAXSPLITSIZE("mapred.max.split.size", 256000000L),
MAPREDMINSPLITSIZE("mapred.min.split.size", 1L),
+ MAPREDMINSPLITSIZEPERNODE("mapred.min.split.size.per.rack", 1L),
+ MAPREDMINSPLITSIZEPERRACK("mapred.min.split.size.per.node", 1L),
HIVEMERGEMAPONLY("hive.mergejob.maponly", true),
HIVESENDHEARTBEAT("hive.heartbeat.interval", 1000),
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1080053&r1=1080052&r2=1080053&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Mar 9 23:10:07 2011
@@ -77,7 +77,6 @@ import org.apache.hadoop.mapred.JobClien
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskReport;
import org.apache.log4j.Appender;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.FileAppender;
@@ -254,9 +253,23 @@ public class ExecDriver extends Task<Map
if (work.getNumMapTasks() != null) {
job.setNumMapTasks(work.getNumMapTasks().intValue());
}
+
+ if (work.getMaxSplitSize() != null) {
+ HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, work.getMaxSplitSize().longValue());
+ }
+
if (work.getMinSplitSize() != null) {
HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work.getMinSplitSize().longValue());
}
+
+ if (work.getMinSplitSizePerNode() != null) {
+ HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, work.getMinSplitSizePerNode().longValue());
+ }
+
+ if (work.getMinSplitSizePerRack() != null) {
+ HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, work.getMinSplitSizePerRack().longValue());
+ }
+
job.setNumReduceTasks(work.getNumReduceTasks().intValue());
job.setReducerClass(ExecReducer.class);
@@ -393,7 +406,7 @@ public class ExecDriver extends Task<Map
if (pwd != null) {
HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd);
}
-
+
returnVal = jobExecHelper.progress(rj, jc);
success = (returnVal == 0);
} catch (Exception e) {
@@ -453,7 +466,7 @@ public class ExecDriver extends Task<Map
return (returnVal);
}
-
+
public boolean mapStarted() {
return this.jobExecHelper.mapStarted();
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java?rev=1080053&r1=1080052&r2=1080053&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java Wed Mar 9 23:10:07 2011
@@ -117,7 +117,7 @@ public class ConditionalResolverMergeFil
long trgtSize = conf.getLongVar(HiveConf.ConfVars.HIVEMERGEMAPFILESSIZE);
long avgConditionSize = conf
.getLongVar(HiveConf.ConfVars.HIVEMERGEMAPFILESAVGSIZE);
- trgtSize = trgtSize > avgConditionSize ? trgtSize : avgConditionSize;
+ trgtSize = Math.max(trgtSize, avgConditionSize);
Task<? extends Serializable> mvTask = ctx.getListTasks().get(0);
Task<? extends Serializable> mrTask = ctx.getListTasks().get(1);
@@ -255,8 +255,12 @@ public class ConditionalResolverMergeFil
reducers = Math.min(maxReducers, reducers);
work.setNumReduceTasks(reducers);
}
+ work.setMaxSplitSize(targetSize);
work.setMinSplitSize(targetSize);
+ work.setMinSplitSizePerNode(targetSize);
+ work.setMinSplitSizePerRack(targetSize);
}
+
/**
* Whether to merge files inside directory given the threshold of the average file size.
*
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1080053&r1=1080052&r2=1080053&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Wed Mar 9 23:10:07 2011
@@ -29,9 +29,6 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.QBJoinTree;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
/**
* MapredWork.
@@ -63,7 +60,10 @@ public class MapredWork implements Seria
private Integer numReduceTasks;
private Integer numMapTasks;
+ private Long maxSplitSize;
private Long minSplitSize;
+ private Long minSplitSizePerNode;
+ private Long minSplitSizePerRack;
private boolean needsTagging;
private boolean hadoopSupportsSplittable;
@@ -104,6 +104,10 @@ public class MapredWork implements Seria
this.mapLocalWork = mapLocalWork;
aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>();
this.hadoopSupportsSplittable = hadoopSupportsSplittable;
+ maxSplitSize = null;
+ minSplitSize = null;
+ minSplitSizePerNode = null;
+ minSplitSizePerRack = null;
}
public String getCommand() {
@@ -323,6 +327,14 @@ public class MapredWork implements Seria
this.hadoopSupportsSplittable = hadoopSupportsSplittable;
}
+ public Long getMaxSplitSize() {
+ return maxSplitSize;
+ }
+
+ public void setMaxSplitSize(Long maxSplitSize) {
+ this.maxSplitSize = maxSplitSize;
+ }
+
public Long getMinSplitSize() {
return minSplitSize;
}
@@ -331,6 +343,22 @@ public class MapredWork implements Seria
this.minSplitSize = minSplitSize;
}
+ public Long getMinSplitSizePerNode() {
+ return minSplitSizePerNode;
+ }
+
+ public void setMinSplitSizePerNode(Long minSplitSizePerNode) {
+ this.minSplitSizePerNode = minSplitSizePerNode;
+ }
+
+ public Long getMinSplitSizePerRack() {
+ return minSplitSizePerRack;
+ }
+
+ public void setMinSplitSizePerRack(Long minSplitSizePerRack) {
+ this.minSplitSizePerRack = minSplitSizePerRack;
+ }
+
public String getInputformat() {
return inputformat;
}