You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/01/28 18:33:15 UTC
svn commit: r904179 - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/
ql/src/java/org/apache/hadoop/hive/ql/plan/
Author: namit
Date: Thu Jan 28 17:33:14 2010
New Revision: 904179
URL: http://svn.apache.org/viewvc?rev=904179&view=rev
Log:
HIVE-1093. Add a "skew join map join size" variable to control the
input size of skew join's following map join job
(He Yongqiang via namit)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/conf/hive-default.xml
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=904179&r1=904178&r2=904179&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Jan 28 17:33:14 2010
@@ -85,6 +85,10 @@
HIVE-1108. Make QueryPlan serializable
(Zheng Shao via namit)
+ HIVE-1093. Add a "skew join map join size" variable to control the
+ input size of skew join's following map join job
+ (He Yongqiang via namit)
+
Release 0.5.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=904179&r1=904178&r2=904179&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Jan 28 17:33:14 2010
@@ -171,7 +171,10 @@
HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long)(16*1000*1000)),
HIVESKEWJOIN("hive.optimize.skewjoin", false),
- HIVESKEWJOINKEY("hive.skewjoin.key", 500000),
+ HIVESKEWJOINKEY("hive.skewjoin.key", 1000000),
+ HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000),
+ HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432), //32M
+ MAPREDMINSPLITSIZE("mapred.min.split.size", 1),
HIVESENDHEARTBEAT("hive.heartbeat.interval", 1000),
HIVEMAXMAPJOINSIZE("hive.mapjoin.maxsize", 100000),
Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=904179&r1=904178&r2=904179&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Thu Jan 28 17:33:14 2010
@@ -294,6 +294,22 @@
</property>
<property>
+ <name>hive.skewjoin.mapjoin.map.tasks</name>
+ <value>10000</value>
+ <description> Determine the number of map task used in the follow up map join job
+ for a skew join. It should be used together with hive.skewjoin.mapjoin.min.split
+ to perform a fine grained control.</description>
+</property>
+
+<property>
+ <name>hive.skewjoin.mapjoin.min.split</name>
+ <value>33554432</value>
+ <description> Determine the number of map task at most used in the follow up map join job
+ for a skew join by specifying the minimum split size. It should be used together with
+ hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.</description>
+</property>
+
+<property>
<name>hive.mapred.mode</name>
<value>nonstrict</value>
<description>The mode in which the hive operations are being performed. In strict mode, some risky queries are not allowed to run</description>
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=904179&r1=904178&r2=904179&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Jan 28 17:33:14 2010
@@ -551,9 +551,19 @@
job.setMapOutputKeyClass(HiveKey.class);
job.setMapOutputValueClass(BytesWritable.class);
-
+ if(work.getNumMapTasks() != null) {
+ job.setNumMapTasks(work.getNumMapTasks().intValue());
+ }
+ if(work.getMinSplitSize() != null) {
+ job.setInt(HiveConf.ConfVars.MAPREDMINSPLITSIZE.varname,
+ work.getMinSplitSize().intValue());
+ }
job.setNumReduceTasks(work.getNumReduceTasks().intValue());
job.setReducerClass(ExecReducer.class);
+
+ if(work.getInputformat() != null) {
+ HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, work.getInputformat());
+ }
// Turn on speculative execution for reducers
HiveConf.setVar(job, HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS,
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=904179&r1=904178&r2=904179&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java Thu Jan 28 17:33:14 2010
@@ -40,6 +40,7 @@
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin;
@@ -91,11 +92,11 @@
* For each table, we launch one mapjoin job, taking the directory containing
* big keys in this table and corresponding dirs in other tables as input.
* (Actally one job for one row in the above.)
- *
+ *
* <p>
* For more discussions, please check
* https://issues.apache.org/jira/browse/HIVE-964.
- *
+ *
*/
public static void processSkewJoin(JoinOperator joinOp,
Task<? extends Serializable> currTask, ParseContext parseCtx)
@@ -152,14 +153,10 @@
Map<Byte, TableDesc> tableDescList = new HashMap<Byte, TableDesc>();
Map<Byte, List<ExprNodeDesc>> newJoinValues = new HashMap<Byte, List<ExprNodeDesc>>();
Map<Byte, List<ExprNodeDesc>> newJoinKeys = new HashMap<Byte, List<ExprNodeDesc>>();
- List<TableDesc> newJoinValueTblDesc = new ArrayList<TableDesc>();// used for
- // create
- // mapJoinDesc,
- // should
- // be in
- // order
+ // used for create mapJoinDesc, should be in order
+ List<TableDesc> newJoinValueTblDesc = new ArrayList<TableDesc>();
- for (Byte tag : tags) {
+ for (int k = 0; k < tags.length; k++) {
newJoinValueTblDesc.add(null);
}
@@ -302,9 +299,10 @@
HiveConf jc = new HiveConf(parseCtx.getConf(),
GenMRSkewJoinProcessor.class);
- HiveConf.setVar(jc, HiveConf.ConfVars.HIVEINPUTFORMAT,
- org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.class
- .getCanonicalName());
+
+ newPlan.setNumMapTasks(HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINNUMMAPTASK));
+ newPlan.setMinSplitSize(HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINMINSPLIT));
+ newPlan.setInputformat(HiveInputFormat.class.getName());
Task<? extends Serializable> skewJoinMapJoinTask = TaskFactory.get(
newPlan, jc);
bigKeysDirToTaskMap.put(bigKeyDirPath, skewJoinMapJoinTask);
@@ -377,4 +375,4 @@
+ UNDERLINE + srcTblBigTbl + UNDERLINE + srcTblSmallTbl;
}
-}
\ No newline at end of file
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=904179&r1=904178&r2=904179&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Thu Jan 28 17:33:14 2010
@@ -53,9 +53,12 @@
private Operator<?> reducer;
private Integer numReduceTasks;
+ private Integer numMapTasks;
+ private Integer minSplitSize;
private boolean needsTagging;
private MapredLocalWork mapLocalWork;
+ private String inputformat;
public MapredWork() {
aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>();
@@ -175,7 +178,16 @@
public void setReducer(final Operator<?> reducer) {
this.reducer = reducer;
}
+
+ public Integer getNumMapTasks() {
+ return numMapTasks;
+ }
+
+ public void setNumMapTasks(Integer numMapTasks) {
+ this.numMapTasks = numMapTasks;
+ }
+
/**
* If the number of reducers is -1, the runtime will automatically figure it
* out by input data size.
@@ -277,4 +289,20 @@
this.needsTagging = needsTagging;
}
+ public Integer getMinSplitSize() {
+ return minSplitSize;
+ }
+
+ public void setMinSplitSize(Integer minSplitSize) {
+ this.minSplitSize = minSplitSize;
+ }
+
+ public String getInputformat() {
+ return inputformat;
+ }
+
+ public void setInputformat(String inputformat) {
+ this.inputformat = inputformat;
+ }
+
}