You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/10/18 22:57:33 UTC
svn commit: r1533628 -
/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
Author: gunther
Date: Fri Oct 18 20:57:33 2013
New Revision: 1533628
URL: http://svn.apache.org/r1533628
Log:
HIVE-5591: Use TezGroupedSplits to combine splits based on headroom in Tez (Gunther Hagleitner)
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1533628&r1=1533627&r2=1533628&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Oct 18 20:57:33 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -51,8 +52,10 @@ import org.apache.hadoop.hive.ql.plan.Te
import org.apache.hadoop.hive.shims.Hadoop20Shims.NullOutputCommitter;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -69,7 +72,6 @@ import org.apache.tez.dag.api.InputDescr
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -217,9 +219,21 @@ public class DagUtils {
// finally create the vertex
Vertex map = null;
+ // use tez to combine splits
+ boolean useTezGroupedSplits = false;
+
int numTasks = -1;
Class amSplitGeneratorClass = null;
InputSplitInfo inputSplitInfo = null;
+ Class inputFormatClass = conf.getClass("mapred.input.format.class",
+ InputFormat.class);
+
+ // we'll set up tez to combine spits for us iff the input format
+ // is HiveInputFormat
+ if (inputFormatClass == HiveInputFormat.class) {
+ useTezGroupedSplits = true;
+ conf.setClass("mapred.input.format.class", TezGroupedSplitsInputFormat.class, InputFormat.class);
+ }
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)) {
// if we're generating the splits in the AM, we just need to set
@@ -245,7 +259,14 @@ public class DagUtils {
assert mapWork.getAliasToWork().keySet().size() == 1;
String alias = mapWork.getAliasToWork().keySet().iterator().next();
- byte[] mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
+
+ byte[] mrInput = null;
+ if (useTezGroupedSplits) {
+ mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf,
+ null, HiveInputFormat.class.getName());
+ } else {
+ mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
+ }
map.addInput(alias,
new InputDescriptor(MRInputLegacy.class.getName()).
setUserPayload(mrInput), amSplitGeneratorClass);