You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/10/15 02:45:11 UTC

[2/2] tez git commit: TEZ-1692. Reduce code duplication between TezMapredSplitsGrouper and TezMapreduceSplitsGrouper. (sseth)

TEZ-1692. Reduce code duplication between TezMapredSplitsGrouper and
TezMapreduceSplitsGrouper. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6632903b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6632903b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6632903b

Branch: refs/heads/master
Commit: 6632903bb3cb70b4717b64f3f78664f34812ec5b
Parents: c89e352
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Oct 14 17:42:35 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Oct 14 17:42:35 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 tez-mapreduce/findbugs-exclude.xml              |  14 +
 .../hadoop/mapred/split/TezGroupedSplit.java    |  11 +-
 .../mapred/split/TezMapredSplitsGrouper.java    | 459 ++-------------
 .../hadoop/mapreduce/split/TezGroupedSplit.java |  19 +-
 .../split/TezMapReduceSplitsGrouper.java        | 573 ++++---------------
 .../common/MRInputAMSplitGenerator.java         |   5 +-
 .../grouper/GroupedSplitContainer.java          |  74 +++
 .../grouper/MapReduceSplitContainer.java        |  64 +++
 .../mapreduce/grouper/MapredSplitContainer.java |  64 +++
 .../tez/mapreduce/grouper/SplitContainer.java   |  41 ++
 .../grouper/SplitSizeEstimatorWrapper.java      |  30 +
 .../SplitSizeEstimatorWrapperMapReduce.java     |  35 ++
 .../SplitSizeEstimatorWrapperMapred.java        |  35 ++
 .../tez/mapreduce/grouper/TezSplitGrouper.java  | 571 ++++++++++++++++++
 .../hadoop/mapred/split/TestGroupedSplits.java  |  11 +-
 16 files changed, 1120 insertions(+), 887 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 17735b5..7c2f030 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
   TEZ-2887. Tez build failure due to missing dependency in pom files.
+  TEZ-1692. Reduce code duplication between TezMapredSplitsGrouper and TezMapreduceSplitsGrouper.
 
 
 Release 0.8.1-alpha: 2015-10-12

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-mapreduce/findbugs-exclude.xml b/tez-mapreduce/findbugs-exclude.xml
index 873d4a2..ec64739 100644
--- a/tez-mapreduce/findbugs-exclude.xml
+++ b/tez-mapreduce/findbugs-exclude.xml
@@ -70,6 +70,13 @@
   </Match>
 
   <Match>
+    <Class name="org.apache.tez.mapreduce.grouper.GroupedSplitContainer"/>
+    <Method name="getLocations"/>
+    <Field name="locations"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
+
+  <Match>
     <Class name="org.apache.tez.mapreduce.hadoop.InputSplitInfoMem"/>
     <Method name="getNewFormatSplits"/>
     <Field name="newFormatSplits"/>
@@ -98,6 +105,13 @@
   </Match>
 
   <Match>
+    <Class name="org.apache.tez.mapreduce.grouper.GroupedSplitContainer"/>
+    <Method name="&lt;init&gt;"/>
+    <Field name="locations"/>
+    <Bug pattern="EI_EXPOSE_REP2"/>
+  </Match>
+
+  <Match>
     <Class name="org.apache.tez.mapreduce.hadoop.InputSplitInfoMem"/>
     <Method name="&lt;init&gt;"/>
     <Field name="oldFormatSplits"/>

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
index a9893aa..bc58043 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
@@ -53,7 +53,16 @@ public class TezGroupedSplit implements InputSplit, Configurable {
   public TezGroupedSplit() {
     
   }
-  
+
+  public TezGroupedSplit(List<InputSplit> wrappedSplits, String wrappedInputFormatName,
+                         String[] locations, String rack, long length) {
+    this.wrappedSplits = wrappedSplits;
+    this.wrappedInputFormatName = wrappedInputFormatName;
+    this.locations = locations;
+    this.rack = rack;
+    this.length = length;
+  }
+
   public TezGroupedSplit(int numSplits, String wrappedInputFormatName,
       String[] locations, String rack) {
     this.wrappedSplits = new ArrayList<InputSplit>(numSplits);

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
index 2194551..f2a8a0c 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
@@ -19,25 +19,23 @@
 package org.apache.hadoop.mapred.split;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
 
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.tez.mapreduce.grouper.GroupedSplitContainer;
+import org.apache.tez.mapreduce.grouper.MapredSplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitSizeEstimatorWrapperMapred;
+import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tez.dag.api.TezUncheckedException;
-
-import com.google.common.base.Preconditions;
 
 /**
  * A Helper that provides grouping logic to group InputSplits
@@ -46,58 +44,9 @@ import com.google.common.base.Preconditions;
  */
 @Public
 @Evolving
-public class TezMapredSplitsGrouper {
+public class TezMapredSplitsGrouper extends TezSplitGrouper {
   private static final Logger LOG = LoggerFactory.getLogger(TezMapredSplitsGrouper.class);
 
-  static class SplitHolder {
-    InputSplit split;
-    boolean isProcessed = false;
-    SplitHolder(InputSplit split) {
-      this.split = split;
-    }
-  }
-  
-  static class LocationHolder {
-    List<SplitHolder> splits;
-    int headIndex = 0;
-    LocationHolder(int capacity) {
-      splits = new ArrayList<SplitHolder>(capacity);
-    }
-    boolean isEmpty() {
-      return (headIndex == splits.size());
-    }
-    SplitHolder getUnprocessedHeadSplit() {
-      while (!isEmpty()) {
-        SplitHolder holder = splits.get(headIndex);
-        if (!holder.isProcessed) {
-          return holder;
-        }
-        incrementHeadIndex();
-      }
-      return null;
-    }
-    void incrementHeadIndex() {
-      headIndex++;
-    }
-  }
-  
-  private static final SplitSizeEstimator DEFAULT_SPLIT_ESTIMATOR = new DefaultSplitSizeEstimator();
-
-  static final class DefaultSplitSizeEstimator implements SplitSizeEstimator {
-    @Override
-    public long getEstimatedSize(InputSplit split) throws IOException {
-      return split.getLength();
-    }
-  }
-
-  Map<String, LocationHolder> createLocationsMap(Configuration conf) {
-    if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE, 
-        TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) {
-      return new TreeMap<String, LocationHolder>();
-    }
-    return new HashMap<String, LocationHolder>();
-  }
-  
   public InputSplit[] getGroupedSplits(Configuration conf,
       InputSplit[] originalSplits, int desiredNumSplits,
       String wrappedInputFormatName) throws IOException {
@@ -107,367 +56,41 @@ public class TezMapredSplitsGrouper {
   public InputSplit[] getGroupedSplits(Configuration conf,
       InputSplit[] originalSplits, int desiredNumSplits,
       String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException {
-    LOG.info("Grouping splits in Tez");
     Preconditions.checkArgument(originalSplits != null, "Splits must be specified");
 
-    int configNumSplits = conf.getInt(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_COUNT, 0);
-    if (configNumSplits > 0) {
-      // always use config override if specified
-      desiredNumSplits = configNumSplits;
-      LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits);
-    }
-
-    if (estimator == null) {
-      estimator = DEFAULT_SPLIT_ESTIMATOR;
-    }
-
-    if (! (configNumSplits > 0 || 
-          originalSplits.length == 0) ) {
-      // numSplits has not been overridden by config
-      // numSplits has been set at runtime
-      // there are splits generated
-      // Do sanity checks
-      long totalLength = 0;
-      for (InputSplit split : originalSplits) {
-        totalLength += estimator.getEstimatedSize(split);
-      }
-
-      int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.length;
-      long lengthPerGroup = totalLength/splitCount;
-      
-      long maxLengthPerGroup = conf.getLong(
-          TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE,
-          TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT);
-      long minLengthPerGroup = conf.getLong(
-          TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE,
-          TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT);
-      if (maxLengthPerGroup < minLengthPerGroup || 
-          minLengthPerGroup <=0) {
-        throw new TezUncheckedException(
-          "Invalid max/min group lengths. Required min>0, max>=min. " +
-          " max: " + maxLengthPerGroup + " min: " + minLengthPerGroup);
-      }
-      if (lengthPerGroup > maxLengthPerGroup) {
-        // splits too big to work. Need to override with max size.
-        int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1;
-        LOG.info("Desired splits: " + desiredNumSplits + " too small. " + 
-            " Desired splitLength: " + lengthPerGroup + 
-            " Max splitLength: " + maxLengthPerGroup +
-            " New desired splits: " + newDesiredNumSplits + 
-            " Total length: " + totalLength +
-            " Original splits: " + originalSplits.length);
-        
-        desiredNumSplits = newDesiredNumSplits;
-      } else if (lengthPerGroup < minLengthPerGroup) {
-        // splits too small to work. Need to override with size.
-        int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;
-        LOG.info("Desired splits: " + desiredNumSplits + " too large. " + 
-            " Desired splitLength: " + lengthPerGroup + 
-            " Min splitLength: " + minLengthPerGroup +
-            " New desired splits: " + newDesiredNumSplits + 
-            " Total length: " + totalLength +
-            " Original splits: " + originalSplits.length);
-        
-        desiredNumSplits = newDesiredNumSplits;
-      }
-    }
-    
-    if (originalSplits == null) {
-      LOG.info("Null original splits");
-      return null;
-    }
-    
-    if (desiredNumSplits == 0 ||
-        originalSplits.length == 0 ||
-        desiredNumSplits >= originalSplits.length) {
-      // nothing set. so return all the splits as is
-      LOG.info("Using original number of splits: " + originalSplits.length +
-          " desired splits: " + desiredNumSplits);
-      InputSplit[] groupedSplits = new TezGroupedSplit[originalSplits.length];
-      int i=0;
-      for (InputSplit split : originalSplits) {
-        TezGroupedSplit newSplit = 
-            new TezGroupedSplit(1, wrappedInputFormatName, split.getLocations());
-        newSplit.addSplit(split);
-        groupedSplits[i++] = newSplit;
-      }
-      return groupedSplits;
-    }
-    
-    String emptyLocation = "EmptyLocation";
-    String[] emptyLocations = {emptyLocation};
-    List<InputSplit> groupedSplitsList = new ArrayList<InputSplit>(desiredNumSplits);
-    
-    long totalLength = 0;
-    Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
-    // go through splits and add them to locations
-    for (InputSplit split : originalSplits) {
-      totalLength += estimator.getEstimatedSize(split);
-      String[] locations = split.getLocations();
-      if (locations == null || locations.length == 0) {
-        locations = emptyLocations;
-      }
-      for (String location : locations ) {
-        if (location == null) {
-          location = emptyLocation;
-        }
-        distinctLocations.put(location, null);
-      }
-    }
-    
-    long lengthPerGroup = totalLength/desiredNumSplits;
-    int numNodeLocations = distinctLocations.size();
-    int numSplitsPerLocation = originalSplits.length/numNodeLocations;
-    int numSplitsInGroup = originalSplits.length/desiredNumSplits;
-
-    // allocation loop here so that we have a good initial size for the lists
-    for (String location : distinctLocations.keySet()) {
-      distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));
-    }
-    
-    Set<String> locSet = new HashSet<String>();
-    for (InputSplit split : originalSplits) {
-      locSet.clear();
-      SplitHolder splitHolder = new SplitHolder(split);
-      String[] locations = split.getLocations();
-      if (locations == null || locations.length == 0) {
-        locations = emptyLocations;
-      }
-      for (String location : locations) {
-        if (location == null) {
-          location = emptyLocation;
-        }
-        locSet.add(location);
-      }
-      for (String location : locSet) {
-        LocationHolder holder = distinctLocations.get(location);
-        holder.splits.add(splitHolder);
-      }
-    }
-    
-    boolean groupByLength = conf.getBoolean(
-        TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH,
-        TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT);
-    boolean groupByCount = conf.getBoolean(
-        TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_COUNT,
-        TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT);
-    if (!(groupByLength || groupByCount)) {
-      throw new TezUncheckedException(
-          "None of the grouping parameters are true: "
-              + TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH + ", "
-              + TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_COUNT);
-    }
-    LOG.info("Desired numSplits: " + desiredNumSplits +
-        " lengthPerGroup: " + lengthPerGroup +
-        " numLocations: " + numNodeLocations +
-        " numSplitsPerLocation: " + numSplitsPerLocation +
-        " numSplitsInGroup: " + numSplitsInGroup + 
-        " totalLength: " + totalLength +
-        " numOriginalSplits: " + originalSplits.length +
-        " . Grouping by length: " + groupByLength + " count: " + groupByCount);
-    
-    // go through locations and group splits
-    int splitsProcessed = 0;
-    List<SplitHolder> group = new ArrayList<SplitHolder>(numSplitsInGroup+1);
-    Set<String> groupLocationSet = new HashSet<String>(10);
-    boolean allowSmallGroups = false;
-    boolean doingRackLocal = false;
-    int iterations = 0;
-    while (splitsProcessed < originalSplits.length) {
-      iterations++;
-      int numFullGroupsCreated = 0;
-      for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
-        group.clear();
-        groupLocationSet.clear();
-        String location = entry.getKey();
-        LocationHolder holder = entry.getValue();
-        SplitHolder splitHolder = holder.getUnprocessedHeadSplit();
-        if (splitHolder == null) {
-          // all splits on node processed
-          continue;
-        }
-        int oldHeadIndex = holder.headIndex;
-        long groupLength = 0;
-        int groupNumSplits = 0;
-        do {
-          group.add(splitHolder);
-          groupLength += estimator.getEstimatedSize(splitHolder.split);
-          groupNumSplits++;
-          holder.incrementHeadIndex();
-          splitHolder = holder.getUnprocessedHeadSplit();
-        } while(splitHolder != null  
-            && (!groupByLength || 
-                (groupLength + estimator.getEstimatedSize(splitHolder.split) <= lengthPerGroup))
-            && (!groupByCount || 
-                (groupNumSplits + 1 <= numSplitsInGroup)));
-
-        if (holder.isEmpty() 
-            && !allowSmallGroups
-            && (!groupByLength || groupLength < lengthPerGroup/2)
-            && (!groupByCount || groupNumSplits < numSplitsInGroup/2)) {
-          // group too small, reset it
-          holder.headIndex = oldHeadIndex;
-          continue;
-        }
-        
-        numFullGroupsCreated++;
-
-        // One split group created
-        String[] groupLocation = {location};
-        if (location == emptyLocation) {
-          groupLocation = null;
-        } else if (doingRackLocal) {
-          for (SplitHolder splitH : group) {
-            String[] locations = splitH.split.getLocations();
-            if (locations != null) {
-              for (String loc : locations) {
-                if (loc != null) {
-                  groupLocationSet.add(loc);
-                }
-              }
-            }
-          }
-          groupLocation = groupLocationSet.toArray(groupLocation);
-        }
-        TezGroupedSplit groupedSplit = 
-            new TezGroupedSplit(group.size(), wrappedInputFormatName, 
-                groupLocation, 
-                // pass rack local hint directly to AM
-                ((doingRackLocal && location != emptyLocation)?location:null));
-        for (SplitHolder groupedSplitHolder : group) {
-          groupedSplit.addSplit(groupedSplitHolder.split);
-          Preconditions.checkState(groupedSplitHolder.isProcessed == false, 
-              "Duplicates in grouping at location: " + location);
-          groupedSplitHolder.isProcessed = true;
-          splitsProcessed++;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Grouped " + group.size()
-              + " length: " + groupedSplit.getLength()
-              + " split at: " + location);
-        }
-        groupedSplitsList.add(groupedSplit);
-      }
-      
-      if (!doingRackLocal && numFullGroupsCreated < 1) {
-        // no node could create a node-local group. go rack-local
-        doingRackLocal = true;
-        // re-create locations
-        int numRemainingSplits = originalSplits.length - splitsProcessed;
-        Set<InputSplit> remainingSplits = new HashSet<InputSplit>(numRemainingSplits);
-        // gather remaining splits.
-        for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
-          LocationHolder locHolder = entry.getValue();
-          while (!locHolder.isEmpty()) {
-            SplitHolder splitHolder = locHolder.getUnprocessedHeadSplit();
-            if (splitHolder != null) {
-              remainingSplits.add(splitHolder.split);
-              locHolder.incrementHeadIndex();
-            }
-          }
-        }
-        if (remainingSplits.size() != numRemainingSplits) {
-          throw new TezUncheckedException("Expected: " + numRemainingSplits 
-              + " got: " + remainingSplits.size());
-        }
-        
-        // doing all this now instead of up front because the number of remaining
-        // splits is expected to be much smaller
-        RackResolver.init(conf);
-        Map<String, String> locToRackMap = new HashMap<String, String>(distinctLocations.size());
-        Map<String, LocationHolder> rackLocations = createLocationsMap(conf);
-        for (String location : distinctLocations.keySet()) {
-          String rack = emptyLocation;
-          if (location != emptyLocation) {
-            rack = RackResolver.resolve(location).getNetworkLocation();
-          }
-          locToRackMap.put(location, rack);
-          if (rackLocations.get(rack) == null) {
-            // splits will probably be located in all racks
-            rackLocations.put(rack, new LocationHolder(numRemainingSplits));
-          }
-        }
-        distinctLocations.clear();
-        HashSet<String> rackSet = new HashSet<String>(rackLocations.size());
-        int numRackSplitsToGroup = remainingSplits.size();
-        for (InputSplit split : originalSplits) {
-          if (numRackSplitsToGroup == 0) {
-            break;
-          }
-          // Iterate through the original splits in their order and consider them for grouping. 
-          // This maintains the original ordering in the list and thus subsequent grouping will 
-          // maintain that order
-          if (!remainingSplits.contains(split)) {
-            continue;
-          }
-          numRackSplitsToGroup--;
-          rackSet.clear();
-          SplitHolder splitHolder = new SplitHolder(split);
-          String[] locations = split.getLocations();
-          if (locations == null || locations.length == 0) {
-            locations = emptyLocations;
+    List<SplitContainer> originalSplitContainers = Lists.transform(Arrays.asList(originalSplits),
+        new Function<InputSplit, SplitContainer>() {
+          @Override
+          public SplitContainer apply(InputSplit input) {
+            return new MapredSplitContainer(input);
           }
-          for (String location : locations ) {
-            if ( location == null) {
-              location = emptyLocation;
+        });
+
+    try {
+      List<InputSplit> resultList = Lists.transform(super
+              .getGroupedSplits(conf, originalSplitContainers, desiredNumSplits,
+                  wrappedInputFormatName, estimator == null ? null :
+                  new SplitSizeEstimatorWrapperMapred(estimator)),
+          new Function<GroupedSplitContainer, InputSplit>() {
+            @Override
+            public InputSplit apply(GroupedSplitContainer input) {
+              List<InputSplit> underlyingSplits = Lists.transform(input.getWrappedSplitContainers(),
+                  new Function<SplitContainer, InputSplit>() {
+                    @Override
+                    public InputSplit apply(SplitContainer input) {
+                      return ((MapredSplitContainer) input).getRawSplit();
+                    }
+                  });
+
+
+              return new TezGroupedSplit(underlyingSplits, input.getWrappedInputFormatName(),
+                  input.getLocations(), input.getRack(), input.getLength());
             }
-            rackSet.add(locToRackMap.get(location));
-          }
-          for (String rack : rackSet) {
-            rackLocations.get(rack).splits.add(splitHolder);
-          }
-        }
-        remainingSplits.clear();
-        distinctLocations = rackLocations;
-        // adjust split length to be smaller because the data is non local
-        float rackSplitReduction = conf.getFloat(
-            TezMapReduceSplitsGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION,
-            TezMapReduceSplitsGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT);
-        if (rackSplitReduction > 0) {
-          long newLengthPerGroup = (long)(lengthPerGroup*rackSplitReduction);
-          int newNumSplitsInGroup = (int) (numSplitsInGroup*rackSplitReduction);
-          if (newLengthPerGroup > 0) {
-            lengthPerGroup = newLengthPerGroup;
-          }
-          if (newNumSplitsInGroup > 0) {
-            numSplitsInGroup = newNumSplitsInGroup;
-          }
-        }
-        
-        LOG.info("Doing rack local after iteration: " + iterations +
-            " splitsProcessed: " + splitsProcessed + 
-            " numFullGroupsInRound: " + numFullGroupsCreated +
-            " totalGroups: " + groupedSplitsList.size() +
-            " lengthPerGroup: " + lengthPerGroup +
-            " numSplitsInGroup: " + numSplitsInGroup);
-        
-        // dont do smallGroups for the first pass
-        continue;
-      }
-      
-      if (!allowSmallGroups && numFullGroupsCreated <= numNodeLocations/10) {
-        // a few nodes have a lot of data or data is thinly spread across nodes
-        // so allow small groups now        
-        allowSmallGroups = true;
-        LOG.info("Allowing small groups after iteration: " + iterations +
-            " splitsProcessed: " + splitsProcessed + 
-            " numFullGroupsInRound: " + numFullGroupsCreated +
-            " totalGroups: " + groupedSplitsList.size());
-      }
-      
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Iteration: " + iterations +
-            " splitsProcessed: " + splitsProcessed + 
-            " numFullGroupsInRound: " + numFullGroupsCreated +
-            " totalGroups: " + groupedSplitsList.size());
-      }
+          });
+      InputSplit[] resultArr = resultList.toArray(new InputSplit[resultList.size()]);
+      return resultArr;
+    } catch (InterruptedException e) {
+      throw new IOException(e);
     }
-    InputSplit[] groupedSplits = new InputSplit[groupedSplitsList.size()];
-    groupedSplitsList.toArray(groupedSplits);
-    LOG.info("Number of splits desired: " + desiredNumSplits + 
-        " created: " + groupedSplitsList.size() + 
-        " splitsProcessed: " + splitsProcessed);
-    return groupedSplits;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
index 430d2ec..2d198ad 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configurable;
@@ -50,11 +51,25 @@ public class TezGroupedSplit extends InputSplit
   String rack = null;
   long length = 0;
   Configuration conf;
-  
+
+  @InterfaceAudience.Private
   public TezGroupedSplit() {
     
   }
-  
+
+  @InterfaceAudience.Private
+  /**
+   * Meant for internal usage only
+   */
+  public TezGroupedSplit(List<InputSplit> wrappedSplits, String wrappedInputFormatName,
+                         String[] locations, String rack, long length) {
+    this.wrappedSplits = wrappedSplits;
+    this.wrappedInputFormatName = wrappedInputFormatName;
+    this.locations = locations;
+    this.rack = rack;
+    this.length = length;
+  }
+
   public TezGroupedSplit(int numSplits, String wrappedInputFormatName,
       String[] locations, String rack) {
     this.wrappedSplits = new ArrayList<InputSplit>(numSplits);

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
index 4be3931..87729bd 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
@@ -19,26 +19,24 @@
 package org.apache.hadoop.mapreduce.split;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
 
 import javax.annotation.Nullable;
 
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.tez.mapreduce.grouper.GroupedSplitContainer;
+import org.apache.tez.mapreduce.grouper.MapReduceSplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitSizeEstimatorWrapperMapReduce;
+import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tez.dag.api.TezUncheckedException;
-
-import com.google.common.base.Preconditions;
 
 /**
  * Helper that provides a grouping of input splits based 
@@ -47,117 +45,94 @@ import com.google.common.base.Preconditions;
  */
 @Public
 @Evolving
-public class TezMapReduceSplitsGrouper {
+public class TezMapReduceSplitsGrouper extends TezSplitGrouper {
   private static final Logger LOG = LoggerFactory.getLogger(TezMapReduceSplitsGrouper.class);
 
   /**
-   * Specify the number of splits desired to be created
+   * @deprecated See equivalent in {@link TezSplitGrouper}
    */
-  public static final String TEZ_GROUPING_SPLIT_COUNT = "tez.grouping.split-count";
+  @Deprecated
+  public static final String TEZ_GROUPING_SPLIT_COUNT = TezSplitGrouper.TEZ_GROUPING_SPLIT_COUNT;
+
   /**
-   * Limit the number of splits in a group by the total length of the splits in the group
+   * @deprecated See equivalent in {@link TezSplitGrouper}
    */
-  public static final String TEZ_GROUPING_SPLIT_BY_LENGTH = "tez.grouping.by-length";
-  public static final boolean TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT = true;
+  @Deprecated
+  public static final String TEZ_GROUPING_SPLIT_BY_LENGTH = TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH;
   /**
-   * Limit the number of splits in a group by the number of splits in the group
+   * @deprecated See equivalent in {@link TezSplitGrouper}
    */
-  public static final String TEZ_GROUPING_SPLIT_BY_COUNT = "tez.grouping.by-count";
-  public static final boolean TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT = false;
+  @Deprecated
+  public static final boolean TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT;
 
   /**
-   * The multiplier for available queue capacity when determining number of
-   * tasks for a Vertex. 1.7 with 100% queue available implies generating a
-   * number of tasks roughly equal to 170% of the available containers on the
-   * queue. This enables multiple waves of mappers where the final wave is slightly smaller
-   * than the remaining waves. The gap helps overlap the final wave with any slower tasks
-   * from previous waves and tries to hide the delays from the slower tasks. Good values for 
-   * this are 1.7, 2.7, 3.7 etc. Increase the number of waves to make the tasks smaller or
-   * shorter.
+   * @deprecated See equivalent in {@link TezSplitGrouper}
    */
-  public static final String TEZ_GROUPING_SPLIT_WAVES = "tez.grouping.split-waves";
-  public static final float TEZ_GROUPING_SPLIT_WAVES_DEFAULT = 1.7f;
+  @Deprecated
+  public static final String TEZ_GROUPING_SPLIT_BY_COUNT = TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_COUNT;
+  /**
+   * @deprecated See equivalent in {@link TezSplitGrouper}
+   */
+  @Deprecated
+  public static final boolean TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT;
 
   /**
-   * Upper bound on the size (in bytes) of a grouped split, to avoid generating excessively large splits.
+   * @deprecated See equivalent in {@link TezSplitGrouper}
+   */
+  @Deprecated
+  public static final String TEZ_GROUPING_SPLIT_WAVES = TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES;
+  /**
+   * @deprecated See equivalent in {@link TezSplitGrouper}
    */
-  public static final String TEZ_GROUPING_SPLIT_MAX_SIZE = "tez.grouping.max-size";
-  public static final long TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT = 1024*1024*1024L;
+  @Deprecated
+  public static final float TEZ_GROUPING_SPLIT_WAVES_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT;
 
   /**
-   * Lower bound on the size (in bytes) of a grouped split, to avoid generating too many small splits.
+   * @deprecated See equivalent in {@link TezSplitGrouper}
+   */
+  @Deprecated
+  public static final String TEZ_GROUPING_SPLIT_MAX_SIZE = TezSplitGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE;
+  /**
+   * @deprecated See equivalent in {@link TezSplitGrouper}
    */
-  public static final String TEZ_GROUPING_SPLIT_MIN_SIZE = "tez.grouping.min-size";
-  public static final long TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT = 50*1024*1024L;
+  @Deprecated
+  public static final long TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT;
 
   /**
-   * This factor is used to decrease the per group desired (length and count) limits for groups
-   * created by combining splits within a rack. Since reading this split involves reading data intra
-   * rack, the group is made smaller to cover up for the increased latencies in doing intra rack 
-   * reads. The value should be a fraction <= 1.
+   * @deprecated See equivalent in {@link TezSplitGrouper}
    */
-  public static final String TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION = 
-                                              "tez.grouping.rack-split-reduction";
-  public static final float TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT = 0.75f;
-  
+  @Deprecated
+  public static final String TEZ_GROUPING_SPLIT_MIN_SIZE = TezSplitGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE;
   /**
-   * Repeated invocations of grouping on the same splits with the same parameters will produce the 
-   * same groups. This may help in cache reuse but may cause hot-spotting on nodes when there are a
-   * large number of jobs reading the same hot data. True by default.
+   * @deprecated See equivalent in {@link TezSplitGrouper}
    */
-  public static final String TEZ_GROUPING_REPEATABLE = "tez.grouping.repeatable";
-  public static final boolean TEZ_GROUPING_REPEATABLE_DEFAULT = true;
+  @Deprecated
+  public static final long TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT;
 
-  static class SplitHolder {
-    InputSplit split;
-    boolean isProcessed = false;
-    SplitHolder(InputSplit split) {
-      this.split = split;
-    }
-  }
-  
-  static class LocationHolder {
-    List<SplitHolder> splits;
-    int headIndex = 0;
-    LocationHolder(int capacity) {
-      splits = new ArrayList<SplitHolder>(capacity);
-    }
-    boolean isEmpty() {
-      return (headIndex == splits.size());
-    }
-    SplitHolder getUnprocessedHeadSplit() {
-      while (!isEmpty()) {
-        SplitHolder holder = splits.get(headIndex);
-        if (!holder.isProcessed) {
-          return holder;
-        }
-        incrementHeadIndex();
-      }
-      return null;
-    }
-    void incrementHeadIndex() {
-      headIndex++;
-    }
-  }
+  /**
+   * @deprecated See equivalent in {@link TezSplitGrouper}
+   */
+  @Deprecated
+  public static final String TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION = 
+                                              TezSplitGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION;
+  /**
+   * @deprecated See equivalent in {@link TezSplitGrouper}
+   */
+  @Deprecated
+  public static final float TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT = TezSplitGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT;
 
-  private static final SplitSizeEstimator DEFAULT_SPLIT_ESTIMATOR = new DefaultSplitSizeEstimator();
+  /**
+   * @deprecated See equivalent in {@link TezSplitGrouper}
+   */
+  @Deprecated
+  public static final String TEZ_GROUPING_REPEATABLE = TezSplitGrouper.TEZ_GROUPING_REPEATABLE;
+  /**
+   * @deprecated See equivalent in {@link TezSplitGrouper}
+   */
+  @Deprecated
+  public static final boolean TEZ_GROUPING_REPEATABLE_DEFAULT = TezSplitGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT;
 
-  static final class DefaultSplitSizeEstimator implements SplitSizeEstimator {
-    @Override
-    public long getEstimatedSize(InputSplit split) throws InterruptedException,
-        IOException {
-      return split.getLength();
-    }
-  }
 
-  Map<String, LocationHolder> createLocationsMap(Configuration conf) {
-    if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE, 
-        TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) {
-      return new TreeMap<String, LocationHolder>();
-    }
-    return new HashMap<String, LocationHolder>();
-  }
-  
   public List<InputSplit> getGroupedSplits(Configuration conf,
       List<InputSplit> originalSplits, int desiredNumSplits,
       String wrappedInputFormatName) throws IOException, InterruptedException {
@@ -166,370 +141,48 @@ public class TezMapReduceSplitsGrouper {
   }
 
   public List<InputSplit> getGroupedSplits(Configuration conf,
-      List<InputSplit> originalSplits, int desiredNumSplits,
-      String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException, InterruptedException {
-    LOG.info("Grouping splits in Tez");
+                                           List<InputSplit> originalSplits, int desiredNumSplits,
+                                           String wrappedInputFormatName,
+                                           SplitSizeEstimator estimator) throws IOException,
+      InterruptedException {
     Preconditions.checkArgument(originalSplits != null, "Splits must be specified");
+    List<SplitContainer> originalSplitContainers = Lists.transform(originalSplits,
+        new Function<InputSplit, SplitContainer>() {
+          @Override
+          public SplitContainer apply(InputSplit input) {
+            return new MapReduceSplitContainer(input);
+          }
+        });
 
-    int configNumSplits = conf.getInt(TEZ_GROUPING_SPLIT_COUNT, 0);
-    if (configNumSplits > 0) {
-      // always use config override if specified
-      desiredNumSplits = configNumSplits;
-      LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits);
-    }
-
-    if (estimator == null) {
-      estimator = DEFAULT_SPLIT_ESTIMATOR;
-    }
 
-    if (! (configNumSplits > 0 || 
-          originalSplits.size() == 0)) {
-      // numSplits has not been overridden by config
-      // numSplits has been set at runtime
-      // there are splits generated
-      // desired splits is less than number of splits generated
-      // Do sanity checks
-      long totalLength = 0;
-      for (InputSplit split : originalSplits) {
-        totalLength += estimator.getEstimatedSize(split);
-      }
-  
-      int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.size();
-      long lengthPerGroup = totalLength/splitCount;
+    return Lists.transform(super
+            .getGroupedSplits(conf, originalSplitContainers, desiredNumSplits,
+                wrappedInputFormatName, estimator == null ? null :
+                new SplitSizeEstimatorWrapperMapReduce(estimator)),
+        new Function<GroupedSplitContainer, InputSplit>() {
+          @Override
+          public InputSplit apply(GroupedSplitContainer input) {
 
-      long maxLengthPerGroup = conf.getLong(
-          TEZ_GROUPING_SPLIT_MAX_SIZE,
-          TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT);
-      long minLengthPerGroup = conf.getLong(
-          TEZ_GROUPING_SPLIT_MIN_SIZE,
-          TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT);
-      if (maxLengthPerGroup < minLengthPerGroup || 
-          minLengthPerGroup <=0) {
-        throw new TezUncheckedException(
-          "Invalid max/min group lengths. Required min>0, max>=min. " +
-          " max: " + maxLengthPerGroup + " min: " + minLengthPerGroup);
-      }
-      if (lengthPerGroup > maxLengthPerGroup) {
-        // splits too big to work. Need to override with max size.
-        int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1;
-        LOG.info("Desired splits: " + desiredNumSplits + " too small. " + 
-            " Desired splitLength: " + lengthPerGroup + 
-            " Max splitLength: " + maxLengthPerGroup +
-            " New desired splits: " + newDesiredNumSplits + 
-            " Total length: " + totalLength +
-            " Original splits: " + originalSplits.size());
-        
-        desiredNumSplits = newDesiredNumSplits;
-      } else if (lengthPerGroup < minLengthPerGroup) {
-        // splits too small to work. Need to override with size.
-        int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;
-        LOG.info("Desired splits: " + desiredNumSplits + " too large. " + 
-            " Desired splitLength: " + lengthPerGroup + 
-            " Min splitLength: " + minLengthPerGroup +
-            " New desired splits: " + newDesiredNumSplits + 
-            " Total length: " + totalLength +
-            " Original splits: " + originalSplits.size());
-        
-        desiredNumSplits = newDesiredNumSplits;
-      }
-    }
-     
-    List<InputSplit> groupedSplits = null;
-    
-    if (desiredNumSplits == 0 ||
-        originalSplits.size() == 0 ||
-        desiredNumSplits >= originalSplits.size()) {
-      // nothing set. so return all the splits as is
-      LOG.info("Using original number of splits: " + originalSplits.size() +
-          " desired splits: " + desiredNumSplits);
-      groupedSplits = new ArrayList<InputSplit>(originalSplits.size());
-      for (InputSplit split : originalSplits) {
-        TezGroupedSplit newSplit = 
-            new TezGroupedSplit(1, wrappedInputFormatName, split.getLocations());
-        newSplit.addSplit(split);
-        groupedSplits.add(newSplit);
-      }
-      return groupedSplits;
-    }
-    
-    String emptyLocation = "EmptyLocation";
-    String[] emptyLocations = {emptyLocation};
-    groupedSplits = new ArrayList<InputSplit>(desiredNumSplits);
-    
-    long totalLength = 0;
-    Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
-    // go through splits and add them to locations
-    for (InputSplit split : originalSplits) {
-      totalLength += estimator.getEstimatedSize(split);
-      String[] locations = split.getLocations();
-      if (locations == null || locations.length == 0) {
-        locations = emptyLocations;
-      }
-      for (String location : locations ) {
-        if (location == null) {
-          location = emptyLocation;
-        }
-        distinctLocations.put(location, null);
-      }
-    }
-    
-    long lengthPerGroup = totalLength/desiredNumSplits;
-    int numNodeLocations = distinctLocations.size();
-    int numSplitsPerLocation = originalSplits.size()/numNodeLocations;
-    int numSplitsInGroup = originalSplits.size()/desiredNumSplits;
-    
-    // allocation loop here so that we have a good initial size for the lists
-    for (String location : distinctLocations.keySet()) {
-      distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));
-    }
-    
-    Set<String> locSet = new HashSet<String>();
-    for (InputSplit split : originalSplits) {
-      locSet.clear();
-      SplitHolder splitHolder = new SplitHolder(split);
-      String[] locations = split.getLocations();
-      if (locations == null || locations.length == 0) {
-        locations = emptyLocations;
-      }
-      for (String location : locations) {
-        if (location == null) {
-          location = emptyLocation;
-        }
-        locSet.add(location);
-      }
-      for (String location : locSet) {
-        LocationHolder holder = distinctLocations.get(location);
-        holder.splits.add(splitHolder);
-      }
-    }
-    
-    boolean groupByLength = conf.getBoolean(
-        TEZ_GROUPING_SPLIT_BY_LENGTH,
-        TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT);
-    boolean groupByCount = conf.getBoolean(
-        TEZ_GROUPING_SPLIT_BY_COUNT,
-        TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT);
-    if (!(groupByLength || groupByCount)) {
-      throw new TezUncheckedException(
-          "None of the grouping parameters are true: "
-              + TEZ_GROUPING_SPLIT_BY_LENGTH + ", "
-              + TEZ_GROUPING_SPLIT_BY_COUNT);
-    }
-    LOG.info("Desired numSplits: " + desiredNumSplits +
-        " lengthPerGroup: " + lengthPerGroup +
-        " numLocations: " + numNodeLocations +
-        " numSplitsPerLocation: " + numSplitsPerLocation +
-        " numSplitsInGroup: " + numSplitsInGroup + 
-        " totalLength: " + totalLength +
-        " numOriginalSplits: " + originalSplits.size() +
-        " . Grouping by length: " + groupByLength + " count: " + groupByCount);
-    
-    // go through locations and group splits
-    int splitsProcessed = 0;
-    List<SplitHolder> group = new ArrayList<SplitHolder>(numSplitsInGroup);
-    Set<String> groupLocationSet = new HashSet<String>(10);
-    boolean allowSmallGroups = false;
-    boolean doingRackLocal = false;
-    int iterations = 0;
-    while (splitsProcessed < originalSplits.size()) {
-      iterations++;
-      int numFullGroupsCreated = 0;
-      for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
-        group.clear();
-        groupLocationSet.clear();
-        String location = entry.getKey();
-        LocationHolder holder = entry.getValue();
-        SplitHolder splitHolder = holder.getUnprocessedHeadSplit();
-        if (splitHolder == null) {
-          // all splits on node processed
-          continue;
-        }
-        int oldHeadIndex = holder.headIndex;
-        long groupLength = 0;
-        int groupNumSplits = 0;
-        do {
-          group.add(splitHolder);
-          groupLength += estimator.getEstimatedSize(splitHolder.split);
-          groupNumSplits++;
-          holder.incrementHeadIndex();
-          splitHolder = holder.getUnprocessedHeadSplit();
-        } while(splitHolder != null  
-            && (!groupByLength || 
-                (groupLength + estimator.getEstimatedSize(splitHolder.split) <= lengthPerGroup))
-            && (!groupByCount || 
-                (groupNumSplits + 1 <= numSplitsInGroup)));
+            List<InputSplit> underlyingSplits = Lists.transform(input.getWrappedSplitContainers(),
+                new Function<SplitContainer, InputSplit>() {
+                  @Override
+                  public InputSplit apply(SplitContainer input) {
+                    return ((MapReduceSplitContainer) input).getRawSplit();
+                  }
+                });
 
-        if (holder.isEmpty() 
-            && !allowSmallGroups
-            && (!groupByLength || groupLength < lengthPerGroup/2)
-            && (!groupByCount || groupNumSplits < numSplitsInGroup/2)) {
-          // group too small, reset it
-          holder.headIndex = oldHeadIndex;
-          continue;
-        }
-        
-        numFullGroupsCreated++;
+            return new TezGroupedSplit(underlyingSplits, input.getWrappedInputFormatName(),
+                input.getLocations(), input.getRack(), input.getLength());
 
-        // One split group created
-        String[] groupLocation = {location};
-        if (location == emptyLocation) {
-          groupLocation = null;
-        } else if (doingRackLocal) {
-          for (SplitHolder splitH : group) {
-            String[] locations = splitH.split.getLocations();
-            if (locations != null) {
-              for (String loc : locations) {
-                if (loc != null) {
-                  groupLocationSet.add(loc);
-                }
-              }
-            }
-          }
-          groupLocation = groupLocationSet.toArray(groupLocation);
-        }
-        TezGroupedSplit groupedSplit = 
-            new TezGroupedSplit(group.size(), wrappedInputFormatName, 
-                groupLocation,
-                // pass rack local hint directly to AM
-                ((doingRackLocal && location != emptyLocation)?location:null));
-        for (SplitHolder groupedSplitHolder : group) {
-          groupedSplit.addSplit(groupedSplitHolder.split);
-          Preconditions.checkState(groupedSplitHolder.isProcessed == false,
-              "Duplicates in grouping at location: " + location);
-          groupedSplitHolder.isProcessed = true;
-          splitsProcessed++;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Grouped " + group.size()
-              + " length: " + groupedSplit.getLength()
-              + " split at: " + location);
-        }
-        groupedSplits.add(groupedSplit);
-      }
-      
-      if (!doingRackLocal && numFullGroupsCreated < 1) {
-        // no node could create a node-local group. go rack-local
-        doingRackLocal = true;
-        // re-create locations
-        int numRemainingSplits = originalSplits.size() - splitsProcessed;
-        Set<InputSplit> remainingSplits = new HashSet<InputSplit>(numRemainingSplits);
-        // gather remaining splits.
-        for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
-          LocationHolder locHolder = entry.getValue();
-          while (!locHolder.isEmpty()) {
-            SplitHolder splitHolder = locHolder.getUnprocessedHeadSplit();
-            if (splitHolder != null) {
-              remainingSplits.add(splitHolder.split);
-              locHolder.incrementHeadIndex();
-            }
           }
-        }
-        if (remainingSplits.size() != numRemainingSplits) {
-          throw new TezUncheckedException("Expected: " + numRemainingSplits 
-              + " got: " + remainingSplits.size());
-        }
-        
-        // doing all this now instead of up front because the number of remaining
-        // splits is expected to be much smaller
-        RackResolver.init(conf);
-        Map<String, String> locToRackMap = new HashMap<String, String>(distinctLocations.size());
-        Map<String, LocationHolder> rackLocations = createLocationsMap(conf);
-        for (String location : distinctLocations.keySet()) {
-          String rack = emptyLocation;
-          if (location != emptyLocation) {
-            rack = RackResolver.resolve(location).getNetworkLocation();
-          }
-          locToRackMap.put(location, rack);
-          if (rackLocations.get(rack) == null) {
-            // splits will probably be located in all racks
-            rackLocations.put(rack, new LocationHolder(numRemainingSplits));
-          }
-        }
-        distinctLocations.clear();
-        HashSet<String> rackSet = new HashSet<String>(rackLocations.size());
-        int numRackSplitsToGroup = remainingSplits.size();
-        for (InputSplit split : originalSplits) {
-          if (numRackSplitsToGroup == 0) {
-            break;
-          }
-          // Iterate through the original splits in their order and consider them for grouping. 
-          // This maintains the original ordering in the list and thus subsequent grouping will 
-          // maintain that order
-          if (!remainingSplits.contains(split)) {
-            continue;
-          }
-          numRackSplitsToGroup--;
-          rackSet.clear();
-          SplitHolder splitHolder = new SplitHolder(split);
-          String[] locations = split.getLocations();
-          if (locations == null || locations.length == 0) {
-            locations = emptyLocations;
-          }
-          for (String location : locations ) {
-            if (location == null) {
-              location = emptyLocation;
-            }
-            rackSet.add(locToRackMap.get(location));
-          }
-          for (String rack : rackSet) {
-            rackLocations.get(rack).splits.add(splitHolder);
-          }
-        }
-        
-        remainingSplits.clear();
-        distinctLocations = rackLocations;
-        // adjust split length to be smaller because the data is non local
-        float rackSplitReduction = conf.getFloat(
-            TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION,
-            TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT);
-        if (rackSplitReduction > 0) {
-          long newLengthPerGroup = (long)(lengthPerGroup*rackSplitReduction);
-          int newNumSplitsInGroup = (int) (numSplitsInGroup*rackSplitReduction);
-          if (newLengthPerGroup > 0) {
-            lengthPerGroup = newLengthPerGroup;
-          }
-          if (newNumSplitsInGroup > 0) {
-            numSplitsInGroup = newNumSplitsInGroup;
-          }
-        }
-        
-        LOG.info("Doing rack local after iteration: " + iterations +
-            " splitsProcessed: " + splitsProcessed + 
-            " numFullGroupsInRound: " + numFullGroupsCreated +
-            " totalGroups: " + groupedSplits.size() +
-            " lengthPerGroup: " + lengthPerGroup +
-            " numSplitsInGroup: " + numSplitsInGroup);
-        
-        // dont do smallGroups for the first pass
-        continue;
-      }
-      
-      if (!allowSmallGroups && numFullGroupsCreated <= numNodeLocations/10) {
-        // a few nodes have a lot of data or data is thinly spread across nodes
-        // so allow small groups now        
-        allowSmallGroups = true;
-        LOG.info("Allowing small groups after iteration: " + iterations +
-            " splitsProcessed: " + splitsProcessed + 
-            " numFullGroupsInRound: " + numFullGroupsCreated +
-            " totalGroups: " + groupedSplits.size());
-      }
-      
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Iteration: " + iterations +
-            " splitsProcessed: " + splitsProcessed + 
-            " numFullGroupsInRound: " + numFullGroupsCreated +
-            " totalGroups: " + groupedSplits.size());
-      }
-    }
-    LOG.info("Number of splits desired: " + desiredNumSplits + 
-        " created: " + groupedSplits.size() + 
-        " splitsProcessed: " + splitsProcessed);
-    return groupedSplits;
+        });
   }
-  
+
   /**
    * Builder that can be used to configure grouping in Tez
-   * 
+   *
+   * @deprecated See {@link org.apache.tez.mapreduce.grouper.TezSplitGrouper.TezMRSplitsGrouperConfigBuilder#newConfigBuilder(Configuration)}
+   *
    * @param conf
    *          {@link Configuration} This will be modified in place. If
    *          configuration values may be changed at runtime via a config file
@@ -538,10 +191,15 @@ public class TezMapReduceSplitsGrouper {
    *          be derived from the Configuration object.
    * @return {@link org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper.TezMRSplitsGrouperConfigBuilder}
    */
+  @Deprecated
   public static TezMRSplitsGrouperConfigBuilder createConfigBuilder(Configuration conf) {
     return new TezMRSplitsGrouperConfigBuilder(conf);
-  }  
+  }
 
+  /**
+   * @deprecated See {@link org.apache.tez.mapreduce.grouper.TezSplitGrouper.TezMRSplitsGrouperConfigBuilder}
+   */
+  @Deprecated
   public static final class TezMRSplitsGrouperConfigBuilder {
     private final Configuration conf;
 
@@ -556,27 +214,27 @@ public class TezMapReduceSplitsGrouper {
     }
 
     public TezMRSplitsGrouperConfigBuilder setGroupSplitCount(int count) {
-      this.conf.setInt(TEZ_GROUPING_SPLIT_COUNT, count);
+      this.conf.setInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_COUNT, count);
       return this;
     }
 
     public TezMRSplitsGrouperConfigBuilder setGroupSplitByCount(boolean enabled) {
-      this.conf.setBoolean(TEZ_GROUPING_SPLIT_BY_COUNT, enabled);
+      this.conf.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_COUNT, enabled);
       return this;
     }
 
     public TezMRSplitsGrouperConfigBuilder setGroupSplitByLength(boolean enabled) {
-      this.conf.setBoolean(TEZ_GROUPING_SPLIT_BY_LENGTH, enabled);
+      this.conf.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH, enabled);
       return this;
     }
 
     public TezMRSplitsGrouperConfigBuilder setGroupSplitWaves(float multiplier) {
-      this.conf.setFloat(TEZ_GROUPING_SPLIT_WAVES, multiplier);
+      this.conf.setFloat(TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES, multiplier);
       return this;
     }
 
     public TezMRSplitsGrouperConfigBuilder setGroupingRackSplitSizeReduction(float rackSplitSizeReduction) {
-      this.conf.setFloat(TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION, rackSplitSizeReduction);
+      this.conf.setFloat(TezSplitGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION, rackSplitSizeReduction);
       return this;
     }
 
@@ -584,8 +242,8 @@ public class TezMapReduceSplitsGrouper {
      * upper and lower bounds for the splits
      */
     public TezMRSplitsGrouperConfigBuilder setGroupingSplitSize(long lowerBound, long upperBound) {
-      this.conf.setLong(TEZ_GROUPING_SPLIT_MIN_SIZE, lowerBound);
-      this.conf.setLong(TEZ_GROUPING_SPLIT_MAX_SIZE, upperBound);
+      this.conf.setLong(TezSplitGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE, lowerBound);
+      this.conf.setLong(TezSplitGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE, upperBound);
       return this;
     }
 
@@ -593,5 +251,4 @@ public class TezMapReduceSplitsGrouper {
       return this.conf;
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index b93e4ba..ac64bf7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -23,6 +23,7 @@ import java.util.List;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 
+import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -97,8 +98,8 @@ public class MRInputAMSplitGenerator extends InputInitializer {
     int totalResource = getContext().getTotalAvailableResource().getMemory();
     int taskResource = getContext().getVertexTaskResource().getMemory();
     float waves = conf.getFloat(
-        TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
-        TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
+        TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES,
+        TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
 
     int numTasks = (int)((totalResource*waves)/taskResource);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/GroupedSplitContainer.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/GroupedSplitContainer.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/GroupedSplitContainer.java
new file mode 100644
index 0000000..c236257
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/GroupedSplitContainer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+
+/**
+ * An entity to hold grouped splits - either mapred or mapreduce.
+ */
+@InterfaceAudience.Private
+public class GroupedSplitContainer {
+
+  private final List<SplitContainer> wrappedSplits;
+  private final String wrappedInputFormatName;
+  private final String[] locations;
+  private final String rack;
+  long length = 0;
+
+  public GroupedSplitContainer(int numSplits, String wrappedInputFormatName,
+                               String[] locations, String rack) {
+    this.wrappedSplits = Lists.newArrayListWithCapacity(numSplits);
+    this.wrappedInputFormatName = wrappedInputFormatName;
+    this.locations = locations;
+    this.rack = rack;
+  }
+
+
+  public void addSplit(SplitContainer splitContainer) {
+    wrappedSplits.add(splitContainer);
+    try {
+      length += splitContainer.getLength();
+    } catch (Exception e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+  public String getWrappedInputFormatName() {
+    return this.wrappedInputFormatName;
+  }
+
+  public List<SplitContainer> getWrappedSplitContainers() {
+    return this.wrappedSplits;
+  }
+
+  public String[] getLocations() {
+    return this.locations;
+  }
+
+  public String getRack() {
+    return this.rack;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapReduceSplitContainer.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapReduceSplitContainer.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapReduceSplitContainer.java
new file mode 100644
index 0000000..63e2138
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapReduceSplitContainer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public class MapReduceSplitContainer extends SplitContainer {
+
+  private final InputSplit inputSplit;
+
+  public MapReduceSplitContainer(InputSplit inputSplit) {
+    Preconditions.checkNotNull(inputSplit);
+    this.inputSplit = inputSplit;
+  }
+
+  @Override
+  public String[] getPreferredLocations() throws IOException, InterruptedException {
+    return inputSplit.getLocations();
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    return inputSplit.getLength();
+  }
+
+  public InputSplit getRawSplit() {
+    return this.inputSplit;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    MapReduceSplitContainer that = (MapReduceSplitContainer) o;
+
+    return !(inputSplit != null ? !inputSplit.equals(that.inputSplit) : that.inputSplit != null);
+
+  }
+
+  @Override
+  public int hashCode() {
+    return inputSplit != null ? inputSplit.hashCode() : 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapredSplitContainer.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapredSplitContainer.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapredSplitContainer.java
new file mode 100644
index 0000000..f7dbfda
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapredSplitContainer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.mapred.InputSplit;
+
+public class MapredSplitContainer extends SplitContainer {
+
+  private final InputSplit inputSplit;
+
+  public MapredSplitContainer(InputSplit inputSplit) {
+    Preconditions.checkNotNull(inputSplit);
+    this.inputSplit = inputSplit;
+  }
+
+  @Override
+  public String[] getPreferredLocations() throws IOException {
+    return inputSplit.getLocations();
+  }
+
+  @Override
+  public long getLength() throws IOException {
+    return inputSplit.getLength();
+  }
+
+  public InputSplit getRawSplit() {
+    return this.inputSplit;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    MapredSplitContainer that = (MapredSplitContainer) o;
+
+    return !(inputSplit != null ? !inputSplit.equals(that.inputSplit) : that.inputSplit != null);
+
+  }
+
+  @Override
+  public int hashCode() {
+    return inputSplit != null ? inputSplit.hashCode() : 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitContainer.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitContainer.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitContainer.java
new file mode 100644
index 0000000..383b9ca
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitContainer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+/**
+ * Interface to represent both mapred and mapreduce splits
+ */
+public abstract class SplitContainer {
+
+  private boolean isProcessed = false;
+
+
+  public abstract String[] getPreferredLocations() throws IOException, InterruptedException;
+
+  public abstract long getLength() throws IOException, InterruptedException;
+
+  public boolean isProcessed() {
+    return isProcessed;
+  }
+
+  public void setIsProcessed(boolean val) {
+    this.isProcessed = val;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapper.java
new file mode 100644
index 0000000..ebb33ad
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+
+/**
+ * An interface to handle split size estimation across mapred and mapreduce splits
+ */
+@InterfaceAudience.Private
+public interface SplitSizeEstimatorWrapper {
+
+  long getEstimatedSize(SplitContainer splitContainer) throws IOException, InterruptedException;
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapReduce.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapReduce.java
new file mode 100644
index 0000000..df6e3c9
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapReduce.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.split.SplitSizeEstimator;
+
+public class SplitSizeEstimatorWrapperMapReduce implements SplitSizeEstimatorWrapper {
+
+  private final SplitSizeEstimator estimator;
+
+  public SplitSizeEstimatorWrapperMapReduce(SplitSizeEstimator estimator) {
+    this.estimator = estimator;
+  }
+
+  @Override
+  public long getEstimatedSize(SplitContainer rawContainer) throws IOException,
+      InterruptedException {
+    MapReduceSplitContainer splitContainer = (MapReduceSplitContainer)rawContainer;
+    return estimator.getEstimatedSize(splitContainer.getRawSplit());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapred.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapred.java
new file mode 100644
index 0000000..6dd3a56
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapred.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.split.SplitSizeEstimator;
+
+public class SplitSizeEstimatorWrapperMapred implements SplitSizeEstimatorWrapper {
+
+  private final SplitSizeEstimator estimator;
+
+  public SplitSizeEstimatorWrapperMapred(SplitSizeEstimator estimator) {
+    this.estimator = estimator;
+  }
+
+  @Override
+  public long getEstimatedSize(SplitContainer rawContainer) throws IOException,
+      InterruptedException {
+    MapredSplitContainer splitContainer = (MapredSplitContainer)rawContainer;
+    return estimator.getEstimatedSize(splitContainer.getRawSplit());
+  }
+}