You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/10/18 22:57:33 UTC

svn commit: r1533628 - /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java

Author: gunther
Date: Fri Oct 18 20:57:33 2013
New Revision: 1533628

URL: http://svn.apache.org/r1533628
Log:
HIVE-5591: Use TezGroupedSplits to combine splits based on headroom in Tez (Gunther Hagleitner)

Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1533628&r1=1533627&r2=1533628&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Oct 18 20:57:33 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
 import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -51,8 +52,10 @@ import org.apache.hadoop.hive.ql.plan.Te
 import org.apache.hadoop.hive.shims.Hadoop20Shims.NullOutputCommitter;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -69,7 +72,6 @@ import org.apache.tez.dag.api.InputDescr
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -217,9 +219,21 @@ public class DagUtils {
     // finally create the vertex
     Vertex map = null;
 
+    // use tez to combine splits
+    boolean useTezGroupedSplits = false;
+
     int numTasks = -1;
     Class amSplitGeneratorClass = null;
     InputSplitInfo inputSplitInfo = null;
+    Class inputFormatClass = conf.getClass("mapred.input.format.class",
+        InputFormat.class);
+
+    // we'll set up tez to combine spits for us iff the input format
+    // is HiveInputFormat
+    if (inputFormatClass == HiveInputFormat.class) {
+      useTezGroupedSplits = true;
+      conf.setClass("mapred.input.format.class", TezGroupedSplitsInputFormat.class, InputFormat.class);
+    }
 
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)) {
       // if we're generating the splits in the AM, we just need to set
@@ -245,7 +259,14 @@ public class DagUtils {
     assert mapWork.getAliasToWork().keySet().size() == 1;
 
     String alias = mapWork.getAliasToWork().keySet().iterator().next();
-    byte[] mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
+
+    byte[] mrInput = null;
+    if (useTezGroupedSplits) {
+      mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf,
+          null, HiveInputFormat.class.getName());
+    } else {
+      mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
+    }
     map.addInput(alias,
         new InputDescriptor(MRInputLegacy.class.getName()).
                setUserPayload(mrInput), amSplitGeneratorClass);