You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/10/15 02:45:11 UTC
[2/2] tez git commit: TEZ-1692. Reduce code duplication between
TezMapredSplitsGrouper and TezMapreduceSplitsGrouper. (sseth)
TEZ-1692. Reduce code duplication between TezMapredSplitsGrouper and
TezMapreduceSplitsGrouper. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6632903b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6632903b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6632903b
Branch: refs/heads/master
Commit: 6632903bb3cb70b4717b64f3f78664f34812ec5b
Parents: c89e352
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Oct 14 17:42:35 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Oct 14 17:42:35 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
tez-mapreduce/findbugs-exclude.xml | 14 +
.../hadoop/mapred/split/TezGroupedSplit.java | 11 +-
.../mapred/split/TezMapredSplitsGrouper.java | 459 ++-------------
.../hadoop/mapreduce/split/TezGroupedSplit.java | 19 +-
.../split/TezMapReduceSplitsGrouper.java | 573 ++++---------------
.../common/MRInputAMSplitGenerator.java | 5 +-
.../grouper/GroupedSplitContainer.java | 74 +++
.../grouper/MapReduceSplitContainer.java | 64 +++
.../mapreduce/grouper/MapredSplitContainer.java | 64 +++
.../tez/mapreduce/grouper/SplitContainer.java | 41 ++
.../grouper/SplitSizeEstimatorWrapper.java | 30 +
.../SplitSizeEstimatorWrapperMapReduce.java | 35 ++
.../SplitSizeEstimatorWrapperMapred.java | 35 ++
.../tez/mapreduce/grouper/TezSplitGrouper.java | 571 ++++++++++++++++++
.../hadoop/mapred/split/TestGroupedSplits.java | 11 +-
16 files changed, 1120 insertions(+), 887 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 17735b5..7c2f030 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
TEZ-2887. Tez build failure due to missing dependency in pom files.
+ TEZ-1692. Reduce code duplication between TezMapredSplitsGrouper and TezMapreduceSplitsGrouper.
Release 0.8.1-alpha: 2015-10-12
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-mapreduce/findbugs-exclude.xml b/tez-mapreduce/findbugs-exclude.xml
index 873d4a2..ec64739 100644
--- a/tez-mapreduce/findbugs-exclude.xml
+++ b/tez-mapreduce/findbugs-exclude.xml
@@ -70,6 +70,13 @@
</Match>
<Match>
+ <Class name="org.apache.tez.mapreduce.grouper.GroupedSplitContainer"/>
+ <Method name="getLocations"/>
+ <Field name="locations"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
+
+ <Match>
<Class name="org.apache.tez.mapreduce.hadoop.InputSplitInfoMem"/>
<Method name="getNewFormatSplits"/>
<Field name="newFormatSplits"/>
@@ -98,6 +105,13 @@
</Match>
<Match>
+ <Class name="org.apache.tez.mapreduce.grouper.GroupedSplitContainer"/>
+ <Method name="<init>"/>
+ <Field name="locations"/>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
+
+ <Match>
<Class name="org.apache.tez.mapreduce.hadoop.InputSplitInfoMem"/>
<Method name="<init>"/>
<Field name="oldFormatSplits"/>
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
index a9893aa..bc58043 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java
@@ -53,7 +53,16 @@ public class TezGroupedSplit implements InputSplit, Configurable {
public TezGroupedSplit() {
}
-
+
+ public TezGroupedSplit(List<InputSplit> wrappedSplits, String wrappedInputFormatName,
+ String[] locations, String rack, long length) {
+ this.wrappedSplits = wrappedSplits;
+ this.wrappedInputFormatName = wrappedInputFormatName;
+ this.locations = locations;
+ this.rack = rack;
+ this.length = length;
+ }
+
public TezGroupedSplit(int numSplits, String wrappedInputFormatName,
String[] locations, String rack) {
this.wrappedSplits = new ArrayList<InputSplit>(numSplits);
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
index 2194551..f2a8a0c 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
@@ -19,25 +19,23 @@
package org.apache.hadoop.mapred.split;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.tez.mapreduce.grouper.GroupedSplitContainer;
+import org.apache.tez.mapreduce.grouper.MapredSplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitSizeEstimatorWrapperMapred;
+import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tez.dag.api.TezUncheckedException;
-
-import com.google.common.base.Preconditions;
/**
* A Helper that provides grouping logic to group InputSplits
@@ -46,58 +44,9 @@ import com.google.common.base.Preconditions;
*/
@Public
@Evolving
-public class TezMapredSplitsGrouper {
+public class TezMapredSplitsGrouper extends TezSplitGrouper {
private static final Logger LOG = LoggerFactory.getLogger(TezMapredSplitsGrouper.class);
- static class SplitHolder {
- InputSplit split;
- boolean isProcessed = false;
- SplitHolder(InputSplit split) {
- this.split = split;
- }
- }
-
- static class LocationHolder {
- List<SplitHolder> splits;
- int headIndex = 0;
- LocationHolder(int capacity) {
- splits = new ArrayList<SplitHolder>(capacity);
- }
- boolean isEmpty() {
- return (headIndex == splits.size());
- }
- SplitHolder getUnprocessedHeadSplit() {
- while (!isEmpty()) {
- SplitHolder holder = splits.get(headIndex);
- if (!holder.isProcessed) {
- return holder;
- }
- incrementHeadIndex();
- }
- return null;
- }
- void incrementHeadIndex() {
- headIndex++;
- }
- }
-
- private static final SplitSizeEstimator DEFAULT_SPLIT_ESTIMATOR = new DefaultSplitSizeEstimator();
-
- static final class DefaultSplitSizeEstimator implements SplitSizeEstimator {
- @Override
- public long getEstimatedSize(InputSplit split) throws IOException {
- return split.getLength();
- }
- }
-
- Map<String, LocationHolder> createLocationsMap(Configuration conf) {
- if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE,
- TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) {
- return new TreeMap<String, LocationHolder>();
- }
- return new HashMap<String, LocationHolder>();
- }
-
public InputSplit[] getGroupedSplits(Configuration conf,
InputSplit[] originalSplits, int desiredNumSplits,
String wrappedInputFormatName) throws IOException {
@@ -107,367 +56,41 @@ public class TezMapredSplitsGrouper {
public InputSplit[] getGroupedSplits(Configuration conf,
InputSplit[] originalSplits, int desiredNumSplits,
String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException {
- LOG.info("Grouping splits in Tez");
Preconditions.checkArgument(originalSplits != null, "Splits must be specified");
- int configNumSplits = conf.getInt(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_COUNT, 0);
- if (configNumSplits > 0) {
- // always use config override if specified
- desiredNumSplits = configNumSplits;
- LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits);
- }
-
- if (estimator == null) {
- estimator = DEFAULT_SPLIT_ESTIMATOR;
- }
-
- if (! (configNumSplits > 0 ||
- originalSplits.length == 0) ) {
- // numSplits has not been overridden by config
- // numSplits has been set at runtime
- // there are splits generated
- // Do sanity checks
- long totalLength = 0;
- for (InputSplit split : originalSplits) {
- totalLength += estimator.getEstimatedSize(split);
- }
-
- int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.length;
- long lengthPerGroup = totalLength/splitCount;
-
- long maxLengthPerGroup = conf.getLong(
- TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE,
- TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT);
- long minLengthPerGroup = conf.getLong(
- TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE,
- TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT);
- if (maxLengthPerGroup < minLengthPerGroup ||
- minLengthPerGroup <=0) {
- throw new TezUncheckedException(
- "Invalid max/min group lengths. Required min>0, max>=min. " +
- " max: " + maxLengthPerGroup + " min: " + minLengthPerGroup);
- }
- if (lengthPerGroup > maxLengthPerGroup) {
- // splits too big to work. Need to override with max size.
- int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1;
- LOG.info("Desired splits: " + desiredNumSplits + " too small. " +
- " Desired splitLength: " + lengthPerGroup +
- " Max splitLength: " + maxLengthPerGroup +
- " New desired splits: " + newDesiredNumSplits +
- " Total length: " + totalLength +
- " Original splits: " + originalSplits.length);
-
- desiredNumSplits = newDesiredNumSplits;
- } else if (lengthPerGroup < minLengthPerGroup) {
- // splits too small to work. Need to override with size.
- int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;
- LOG.info("Desired splits: " + desiredNumSplits + " too large. " +
- " Desired splitLength: " + lengthPerGroup +
- " Min splitLength: " + minLengthPerGroup +
- " New desired splits: " + newDesiredNumSplits +
- " Total length: " + totalLength +
- " Original splits: " + originalSplits.length);
-
- desiredNumSplits = newDesiredNumSplits;
- }
- }
-
- if (originalSplits == null) {
- LOG.info("Null original splits");
- return null;
- }
-
- if (desiredNumSplits == 0 ||
- originalSplits.length == 0 ||
- desiredNumSplits >= originalSplits.length) {
- // nothing set. so return all the splits as is
- LOG.info("Using original number of splits: " + originalSplits.length +
- " desired splits: " + desiredNumSplits);
- InputSplit[] groupedSplits = new TezGroupedSplit[originalSplits.length];
- int i=0;
- for (InputSplit split : originalSplits) {
- TezGroupedSplit newSplit =
- new TezGroupedSplit(1, wrappedInputFormatName, split.getLocations());
- newSplit.addSplit(split);
- groupedSplits[i++] = newSplit;
- }
- return groupedSplits;
- }
-
- String emptyLocation = "EmptyLocation";
- String[] emptyLocations = {emptyLocation};
- List<InputSplit> groupedSplitsList = new ArrayList<InputSplit>(desiredNumSplits);
-
- long totalLength = 0;
- Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
- // go through splits and add them to locations
- for (InputSplit split : originalSplits) {
- totalLength += estimator.getEstimatedSize(split);
- String[] locations = split.getLocations();
- if (locations == null || locations.length == 0) {
- locations = emptyLocations;
- }
- for (String location : locations ) {
- if (location == null) {
- location = emptyLocation;
- }
- distinctLocations.put(location, null);
- }
- }
-
- long lengthPerGroup = totalLength/desiredNumSplits;
- int numNodeLocations = distinctLocations.size();
- int numSplitsPerLocation = originalSplits.length/numNodeLocations;
- int numSplitsInGroup = originalSplits.length/desiredNumSplits;
-
- // allocation loop here so that we have a good initial size for the lists
- for (String location : distinctLocations.keySet()) {
- distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));
- }
-
- Set<String> locSet = new HashSet<String>();
- for (InputSplit split : originalSplits) {
- locSet.clear();
- SplitHolder splitHolder = new SplitHolder(split);
- String[] locations = split.getLocations();
- if (locations == null || locations.length == 0) {
- locations = emptyLocations;
- }
- for (String location : locations) {
- if (location == null) {
- location = emptyLocation;
- }
- locSet.add(location);
- }
- for (String location : locSet) {
- LocationHolder holder = distinctLocations.get(location);
- holder.splits.add(splitHolder);
- }
- }
-
- boolean groupByLength = conf.getBoolean(
- TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH,
- TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT);
- boolean groupByCount = conf.getBoolean(
- TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_COUNT,
- TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT);
- if (!(groupByLength || groupByCount)) {
- throw new TezUncheckedException(
- "None of the grouping parameters are true: "
- + TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH + ", "
- + TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_BY_COUNT);
- }
- LOG.info("Desired numSplits: " + desiredNumSplits +
- " lengthPerGroup: " + lengthPerGroup +
- " numLocations: " + numNodeLocations +
- " numSplitsPerLocation: " + numSplitsPerLocation +
- " numSplitsInGroup: " + numSplitsInGroup +
- " totalLength: " + totalLength +
- " numOriginalSplits: " + originalSplits.length +
- " . Grouping by length: " + groupByLength + " count: " + groupByCount);
-
- // go through locations and group splits
- int splitsProcessed = 0;
- List<SplitHolder> group = new ArrayList<SplitHolder>(numSplitsInGroup+1);
- Set<String> groupLocationSet = new HashSet<String>(10);
- boolean allowSmallGroups = false;
- boolean doingRackLocal = false;
- int iterations = 0;
- while (splitsProcessed < originalSplits.length) {
- iterations++;
- int numFullGroupsCreated = 0;
- for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
- group.clear();
- groupLocationSet.clear();
- String location = entry.getKey();
- LocationHolder holder = entry.getValue();
- SplitHolder splitHolder = holder.getUnprocessedHeadSplit();
- if (splitHolder == null) {
- // all splits on node processed
- continue;
- }
- int oldHeadIndex = holder.headIndex;
- long groupLength = 0;
- int groupNumSplits = 0;
- do {
- group.add(splitHolder);
- groupLength += estimator.getEstimatedSize(splitHolder.split);
- groupNumSplits++;
- holder.incrementHeadIndex();
- splitHolder = holder.getUnprocessedHeadSplit();
- } while(splitHolder != null
- && (!groupByLength ||
- (groupLength + estimator.getEstimatedSize(splitHolder.split) <= lengthPerGroup))
- && (!groupByCount ||
- (groupNumSplits + 1 <= numSplitsInGroup)));
-
- if (holder.isEmpty()
- && !allowSmallGroups
- && (!groupByLength || groupLength < lengthPerGroup/2)
- && (!groupByCount || groupNumSplits < numSplitsInGroup/2)) {
- // group too small, reset it
- holder.headIndex = oldHeadIndex;
- continue;
- }
-
- numFullGroupsCreated++;
-
- // One split group created
- String[] groupLocation = {location};
- if (location == emptyLocation) {
- groupLocation = null;
- } else if (doingRackLocal) {
- for (SplitHolder splitH : group) {
- String[] locations = splitH.split.getLocations();
- if (locations != null) {
- for (String loc : locations) {
- if (loc != null) {
- groupLocationSet.add(loc);
- }
- }
- }
- }
- groupLocation = groupLocationSet.toArray(groupLocation);
- }
- TezGroupedSplit groupedSplit =
- new TezGroupedSplit(group.size(), wrappedInputFormatName,
- groupLocation,
- // pass rack local hint directly to AM
- ((doingRackLocal && location != emptyLocation)?location:null));
- for (SplitHolder groupedSplitHolder : group) {
- groupedSplit.addSplit(groupedSplitHolder.split);
- Preconditions.checkState(groupedSplitHolder.isProcessed == false,
- "Duplicates in grouping at location: " + location);
- groupedSplitHolder.isProcessed = true;
- splitsProcessed++;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Grouped " + group.size()
- + " length: " + groupedSplit.getLength()
- + " split at: " + location);
- }
- groupedSplitsList.add(groupedSplit);
- }
-
- if (!doingRackLocal && numFullGroupsCreated < 1) {
- // no node could create a node-local group. go rack-local
- doingRackLocal = true;
- // re-create locations
- int numRemainingSplits = originalSplits.length - splitsProcessed;
- Set<InputSplit> remainingSplits = new HashSet<InputSplit>(numRemainingSplits);
- // gather remaining splits.
- for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
- LocationHolder locHolder = entry.getValue();
- while (!locHolder.isEmpty()) {
- SplitHolder splitHolder = locHolder.getUnprocessedHeadSplit();
- if (splitHolder != null) {
- remainingSplits.add(splitHolder.split);
- locHolder.incrementHeadIndex();
- }
- }
- }
- if (remainingSplits.size() != numRemainingSplits) {
- throw new TezUncheckedException("Expected: " + numRemainingSplits
- + " got: " + remainingSplits.size());
- }
-
- // doing all this now instead of up front because the number of remaining
- // splits is expected to be much smaller
- RackResolver.init(conf);
- Map<String, String> locToRackMap = new HashMap<String, String>(distinctLocations.size());
- Map<String, LocationHolder> rackLocations = createLocationsMap(conf);
- for (String location : distinctLocations.keySet()) {
- String rack = emptyLocation;
- if (location != emptyLocation) {
- rack = RackResolver.resolve(location).getNetworkLocation();
- }
- locToRackMap.put(location, rack);
- if (rackLocations.get(rack) == null) {
- // splits will probably be located in all racks
- rackLocations.put(rack, new LocationHolder(numRemainingSplits));
- }
- }
- distinctLocations.clear();
- HashSet<String> rackSet = new HashSet<String>(rackLocations.size());
- int numRackSplitsToGroup = remainingSplits.size();
- for (InputSplit split : originalSplits) {
- if (numRackSplitsToGroup == 0) {
- break;
- }
- // Iterate through the original splits in their order and consider them for grouping.
- // This maintains the original ordering in the list and thus subsequent grouping will
- // maintain that order
- if (!remainingSplits.contains(split)) {
- continue;
- }
- numRackSplitsToGroup--;
- rackSet.clear();
- SplitHolder splitHolder = new SplitHolder(split);
- String[] locations = split.getLocations();
- if (locations == null || locations.length == 0) {
- locations = emptyLocations;
+ List<SplitContainer> originalSplitContainers = Lists.transform(Arrays.asList(originalSplits),
+ new Function<InputSplit, SplitContainer>() {
+ @Override
+ public SplitContainer apply(InputSplit input) {
+ return new MapredSplitContainer(input);
}
- for (String location : locations ) {
- if ( location == null) {
- location = emptyLocation;
+ });
+
+ try {
+ List<InputSplit> resultList = Lists.transform(super
+ .getGroupedSplits(conf, originalSplitContainers, desiredNumSplits,
+ wrappedInputFormatName, estimator == null ? null :
+ new SplitSizeEstimatorWrapperMapred(estimator)),
+ new Function<GroupedSplitContainer, InputSplit>() {
+ @Override
+ public InputSplit apply(GroupedSplitContainer input) {
+ List<InputSplit> underlyingSplits = Lists.transform(input.getWrappedSplitContainers(),
+ new Function<SplitContainer, InputSplit>() {
+ @Override
+ public InputSplit apply(SplitContainer input) {
+ return ((MapredSplitContainer) input).getRawSplit();
+ }
+ });
+
+
+ return new TezGroupedSplit(underlyingSplits, input.getWrappedInputFormatName(),
+ input.getLocations(), input.getRack(), input.getLength());
}
- rackSet.add(locToRackMap.get(location));
- }
- for (String rack : rackSet) {
- rackLocations.get(rack).splits.add(splitHolder);
- }
- }
- remainingSplits.clear();
- distinctLocations = rackLocations;
- // adjust split length to be smaller because the data is non local
- float rackSplitReduction = conf.getFloat(
- TezMapReduceSplitsGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION,
- TezMapReduceSplitsGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT);
- if (rackSplitReduction > 0) {
- long newLengthPerGroup = (long)(lengthPerGroup*rackSplitReduction);
- int newNumSplitsInGroup = (int) (numSplitsInGroup*rackSplitReduction);
- if (newLengthPerGroup > 0) {
- lengthPerGroup = newLengthPerGroup;
- }
- if (newNumSplitsInGroup > 0) {
- numSplitsInGroup = newNumSplitsInGroup;
- }
- }
-
- LOG.info("Doing rack local after iteration: " + iterations +
- " splitsProcessed: " + splitsProcessed +
- " numFullGroupsInRound: " + numFullGroupsCreated +
- " totalGroups: " + groupedSplitsList.size() +
- " lengthPerGroup: " + lengthPerGroup +
- " numSplitsInGroup: " + numSplitsInGroup);
-
- // dont do smallGroups for the first pass
- continue;
- }
-
- if (!allowSmallGroups && numFullGroupsCreated <= numNodeLocations/10) {
- // a few nodes have a lot of data or data is thinly spread across nodes
- // so allow small groups now
- allowSmallGroups = true;
- LOG.info("Allowing small groups after iteration: " + iterations +
- " splitsProcessed: " + splitsProcessed +
- " numFullGroupsInRound: " + numFullGroupsCreated +
- " totalGroups: " + groupedSplitsList.size());
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Iteration: " + iterations +
- " splitsProcessed: " + splitsProcessed +
- " numFullGroupsInRound: " + numFullGroupsCreated +
- " totalGroups: " + groupedSplitsList.size());
- }
+ });
+ InputSplit[] resultArr = resultList.toArray(new InputSplit[resultList.size()]);
+ return resultArr;
+ } catch (InterruptedException e) {
+ throw new IOException(e);
}
- InputSplit[] groupedSplits = new InputSplit[groupedSplitsList.size()];
- groupedSplitsList.toArray(groupedSplits);
- LOG.info("Number of splits desired: " + desiredNumSplits +
- " created: " + groupedSplitsList.size() +
- " splitsProcessed: " + splitsProcessed);
- return groupedSplits;
}
-
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
index 430d2ec..2d198ad 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configurable;
@@ -50,11 +51,25 @@ public class TezGroupedSplit extends InputSplit
String rack = null;
long length = 0;
Configuration conf;
-
+
+ @InterfaceAudience.Private
public TezGroupedSplit() {
}
-
+
+ @InterfaceAudience.Private
+ /**
+ * Meant for internal usage only
+ */
+ public TezGroupedSplit(List<InputSplit> wrappedSplits, String wrappedInputFormatName,
+ String[] locations, String rack, long length) {
+ this.wrappedSplits = wrappedSplits;
+ this.wrappedInputFormatName = wrappedInputFormatName;
+ this.locations = locations;
+ this.rack = rack;
+ this.length = length;
+ }
+
public TezGroupedSplit(int numSplits, String wrappedInputFormatName,
String[] locations, String rack) {
this.wrappedSplits = new ArrayList<InputSplit>(numSplits);
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
index 4be3931..87729bd 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
@@ -19,26 +19,24 @@
package org.apache.hadoop.mapreduce.split;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
import javax.annotation.Nullable;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.tez.mapreduce.grouper.GroupedSplitContainer;
+import org.apache.tez.mapreduce.grouper.MapReduceSplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitSizeEstimatorWrapperMapReduce;
+import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tez.dag.api.TezUncheckedException;
-
-import com.google.common.base.Preconditions;
/**
* Helper that provides a grouping of input splits based
@@ -47,117 +45,94 @@ import com.google.common.base.Preconditions;
*/
@Public
@Evolving
-public class TezMapReduceSplitsGrouper {
+public class TezMapReduceSplitsGrouper extends TezSplitGrouper {
private static final Logger LOG = LoggerFactory.getLogger(TezMapReduceSplitsGrouper.class);
/**
- * Specify the number of splits desired to be created
+ * @deprecated See equivalent in {@link TezSplitGrouper}
*/
- public static final String TEZ_GROUPING_SPLIT_COUNT = "tez.grouping.split-count";
+ @Deprecated
+ public static final String TEZ_GROUPING_SPLIT_COUNT = TezSplitGrouper.TEZ_GROUPING_SPLIT_COUNT;
+
/**
- * Limit the number of splits in a group by the total length of the splits in the group
+ * @deprecated See equivalent in {@link TezSplitGrouper}
*/
- public static final String TEZ_GROUPING_SPLIT_BY_LENGTH = "tez.grouping.by-length";
- public static final boolean TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT = true;
+ @Deprecated
+ public static final String TEZ_GROUPING_SPLIT_BY_LENGTH = TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH;
/**
- * Limit the number of splits in a group by the number of splits in the group
+ * @deprecated See equivalent in {@link TezSplitGrouper}
*/
- public static final String TEZ_GROUPING_SPLIT_BY_COUNT = "tez.grouping.by-count";
- public static final boolean TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT = false;
+ @Deprecated
+ public static final boolean TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT;
/**
- * The multiplier for available queue capacity when determining number of
- * tasks for a Vertex. 1.7 with 100% queue available implies generating a
- * number of tasks roughly equal to 170% of the available containers on the
- * queue. This enables multiple waves of mappers where the final wave is slightly smaller
- * than the remaining waves. The gap helps overlap the final wave with any slower tasks
- * from previous waves and tries to hide the delays from the slower tasks. Good values for
- * this are 1.7, 2.7, 3.7 etc. Increase the number of waves to make the tasks smaller or
- * shorter.
+ * @deprecated See equivalent in {@link TezSplitGrouper}
*/
- public static final String TEZ_GROUPING_SPLIT_WAVES = "tez.grouping.split-waves";
- public static final float TEZ_GROUPING_SPLIT_WAVES_DEFAULT = 1.7f;
+ @Deprecated
+ public static final String TEZ_GROUPING_SPLIT_BY_COUNT = TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_COUNT;
+ /**
+ * @deprecated See equivalent in {@link TezSplitGrouper}
+ */
+ @Deprecated
+ public static final boolean TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT;
/**
- * Upper bound on the size (in bytes) of a grouped split, to avoid generating excessively large splits.
+ * @deprecated See equivalent in {@link TezSplitGrouper}
+ */
+ @Deprecated
+ public static final String TEZ_GROUPING_SPLIT_WAVES = TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES;
+ /**
+ * @deprecated See equivalent in {@link TezSplitGrouper}
*/
- public static final String TEZ_GROUPING_SPLIT_MAX_SIZE = "tez.grouping.max-size";
- public static final long TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT = 1024*1024*1024L;
+ @Deprecated
+ public static final float TEZ_GROUPING_SPLIT_WAVES_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT;
/**
- * Lower bound on the size (in bytes) of a grouped split, to avoid generating too many small splits.
+ * @deprecated See equivalent in {@link TezSplitGrouper}
+ */
+ @Deprecated
+ public static final String TEZ_GROUPING_SPLIT_MAX_SIZE = TezSplitGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE;
+ /**
+ * @deprecated See equivalent in {@link TezSplitGrouper}
*/
- public static final String TEZ_GROUPING_SPLIT_MIN_SIZE = "tez.grouping.min-size";
- public static final long TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT = 50*1024*1024L;
+ @Deprecated
+ public static final long TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT;
/**
- * This factor is used to decrease the per group desired (length and count) limits for groups
- * created by combining splits within a rack. Since reading this split involves reading data intra
- * rack, the group is made smaller to cover up for the increased latencies in doing intra rack
- * reads. The value should be a fraction <= 1.
+ * @deprecated See equivalent in {@link TezSplitGrouper}
*/
- public static final String TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION =
- "tez.grouping.rack-split-reduction";
- public static final float TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT = 0.75f;
-
+ @Deprecated
+ public static final String TEZ_GROUPING_SPLIT_MIN_SIZE = TezSplitGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE;
/**
- * Repeated invocations of grouping on the same splits with the same parameters will produce the
- * same groups. This may help in cache reuse but may cause hot-spotting on nodes when there are a
- * large number of jobs reading the same hot data. True by default.
+ * @deprecated See equivalent in {@link TezSplitGrouper}
*/
- public static final String TEZ_GROUPING_REPEATABLE = "tez.grouping.repeatable";
- public static final boolean TEZ_GROUPING_REPEATABLE_DEFAULT = true;
+ @Deprecated
+ public static final long TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT = TezSplitGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT;
- static class SplitHolder {
- InputSplit split;
- boolean isProcessed = false;
- SplitHolder(InputSplit split) {
- this.split = split;
- }
- }
-
- static class LocationHolder {
- List<SplitHolder> splits;
- int headIndex = 0;
- LocationHolder(int capacity) {
- splits = new ArrayList<SplitHolder>(capacity);
- }
- boolean isEmpty() {
- return (headIndex == splits.size());
- }
- SplitHolder getUnprocessedHeadSplit() {
- while (!isEmpty()) {
- SplitHolder holder = splits.get(headIndex);
- if (!holder.isProcessed) {
- return holder;
- }
- incrementHeadIndex();
- }
- return null;
- }
- void incrementHeadIndex() {
- headIndex++;
- }
- }
+ /**
+ * @deprecated See equivalent in {@link TezSplitGrouper}
+ */
+ @Deprecated
+ public static final String TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION =
+ TezSplitGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION;
+ /**
+ * @deprecated See equivalent in {@link TezSplitGrouper}
+ */
+ @Deprecated
+ public static final float TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT = TezSplitGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT;
- private static final SplitSizeEstimator DEFAULT_SPLIT_ESTIMATOR = new DefaultSplitSizeEstimator();
+ /**
+ * @deprecated See equivalent in {@link TezSplitGrouper}
+ */
+ @Deprecated
+ public static final String TEZ_GROUPING_REPEATABLE = TezSplitGrouper.TEZ_GROUPING_REPEATABLE;
+ /**
+ * @deprecated See equivalent in {@link TezSplitGrouper}
+ */
+ @Deprecated
+ public static final boolean TEZ_GROUPING_REPEATABLE_DEFAULT = TezSplitGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT;
- static final class DefaultSplitSizeEstimator implements SplitSizeEstimator {
- @Override
- public long getEstimatedSize(InputSplit split) throws InterruptedException,
- IOException {
- return split.getLength();
- }
- }
- Map<String, LocationHolder> createLocationsMap(Configuration conf) {
- if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE,
- TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) {
- return new TreeMap<String, LocationHolder>();
- }
- return new HashMap<String, LocationHolder>();
- }
-
public List<InputSplit> getGroupedSplits(Configuration conf,
List<InputSplit> originalSplits, int desiredNumSplits,
String wrappedInputFormatName) throws IOException, InterruptedException {
@@ -166,370 +141,48 @@ public class TezMapReduceSplitsGrouper {
}
public List<InputSplit> getGroupedSplits(Configuration conf,
- List<InputSplit> originalSplits, int desiredNumSplits,
- String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException, InterruptedException {
- LOG.info("Grouping splits in Tez");
+ List<InputSplit> originalSplits, int desiredNumSplits,
+ String wrappedInputFormatName,
+ SplitSizeEstimator estimator) throws IOException,
+ InterruptedException {
Preconditions.checkArgument(originalSplits != null, "Splits must be specified");
+ List<SplitContainer> originalSplitContainers = Lists.transform(originalSplits,
+ new Function<InputSplit, SplitContainer>() {
+ @Override
+ public SplitContainer apply(InputSplit input) {
+ return new MapReduceSplitContainer(input);
+ }
+ });
- int configNumSplits = conf.getInt(TEZ_GROUPING_SPLIT_COUNT, 0);
- if (configNumSplits > 0) {
- // always use config override if specified
- desiredNumSplits = configNumSplits;
- LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits);
- }
-
- if (estimator == null) {
- estimator = DEFAULT_SPLIT_ESTIMATOR;
- }
- if (! (configNumSplits > 0 ||
- originalSplits.size() == 0)) {
- // numSplits has not been overridden by config
- // numSplits has been set at runtime
- // there are splits generated
- // desired splits is less than number of splits generated
- // Do sanity checks
- long totalLength = 0;
- for (InputSplit split : originalSplits) {
- totalLength += estimator.getEstimatedSize(split);
- }
-
- int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.size();
- long lengthPerGroup = totalLength/splitCount;
+ return Lists.transform(super
+ .getGroupedSplits(conf, originalSplitContainers, desiredNumSplits,
+ wrappedInputFormatName, estimator == null ? null :
+ new SplitSizeEstimatorWrapperMapReduce(estimator)),
+ new Function<GroupedSplitContainer, InputSplit>() {
+ @Override
+ public InputSplit apply(GroupedSplitContainer input) {
- long maxLengthPerGroup = conf.getLong(
- TEZ_GROUPING_SPLIT_MAX_SIZE,
- TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT);
- long minLengthPerGroup = conf.getLong(
- TEZ_GROUPING_SPLIT_MIN_SIZE,
- TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT);
- if (maxLengthPerGroup < minLengthPerGroup ||
- minLengthPerGroup <=0) {
- throw new TezUncheckedException(
- "Invalid max/min group lengths. Required min>0, max>=min. " +
- " max: " + maxLengthPerGroup + " min: " + minLengthPerGroup);
- }
- if (lengthPerGroup > maxLengthPerGroup) {
- // splits too big to work. Need to override with max size.
- int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1;
- LOG.info("Desired splits: " + desiredNumSplits + " too small. " +
- " Desired splitLength: " + lengthPerGroup +
- " Max splitLength: " + maxLengthPerGroup +
- " New desired splits: " + newDesiredNumSplits +
- " Total length: " + totalLength +
- " Original splits: " + originalSplits.size());
-
- desiredNumSplits = newDesiredNumSplits;
- } else if (lengthPerGroup < minLengthPerGroup) {
- // splits too small to work. Need to override with size.
- int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;
- LOG.info("Desired splits: " + desiredNumSplits + " too large. " +
- " Desired splitLength: " + lengthPerGroup +
- " Min splitLength: " + minLengthPerGroup +
- " New desired splits: " + newDesiredNumSplits +
- " Total length: " + totalLength +
- " Original splits: " + originalSplits.size());
-
- desiredNumSplits = newDesiredNumSplits;
- }
- }
-
- List<InputSplit> groupedSplits = null;
-
- if (desiredNumSplits == 0 ||
- originalSplits.size() == 0 ||
- desiredNumSplits >= originalSplits.size()) {
- // nothing set. so return all the splits as is
- LOG.info("Using original number of splits: " + originalSplits.size() +
- " desired splits: " + desiredNumSplits);
- groupedSplits = new ArrayList<InputSplit>(originalSplits.size());
- for (InputSplit split : originalSplits) {
- TezGroupedSplit newSplit =
- new TezGroupedSplit(1, wrappedInputFormatName, split.getLocations());
- newSplit.addSplit(split);
- groupedSplits.add(newSplit);
- }
- return groupedSplits;
- }
-
- String emptyLocation = "EmptyLocation";
- String[] emptyLocations = {emptyLocation};
- groupedSplits = new ArrayList<InputSplit>(desiredNumSplits);
-
- long totalLength = 0;
- Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
- // go through splits and add them to locations
- for (InputSplit split : originalSplits) {
- totalLength += estimator.getEstimatedSize(split);
- String[] locations = split.getLocations();
- if (locations == null || locations.length == 0) {
- locations = emptyLocations;
- }
- for (String location : locations ) {
- if (location == null) {
- location = emptyLocation;
- }
- distinctLocations.put(location, null);
- }
- }
-
- long lengthPerGroup = totalLength/desiredNumSplits;
- int numNodeLocations = distinctLocations.size();
- int numSplitsPerLocation = originalSplits.size()/numNodeLocations;
- int numSplitsInGroup = originalSplits.size()/desiredNumSplits;
-
- // allocation loop here so that we have a good initial size for the lists
- for (String location : distinctLocations.keySet()) {
- distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));
- }
-
- Set<String> locSet = new HashSet<String>();
- for (InputSplit split : originalSplits) {
- locSet.clear();
- SplitHolder splitHolder = new SplitHolder(split);
- String[] locations = split.getLocations();
- if (locations == null || locations.length == 0) {
- locations = emptyLocations;
- }
- for (String location : locations) {
- if (location == null) {
- location = emptyLocation;
- }
- locSet.add(location);
- }
- for (String location : locSet) {
- LocationHolder holder = distinctLocations.get(location);
- holder.splits.add(splitHolder);
- }
- }
-
- boolean groupByLength = conf.getBoolean(
- TEZ_GROUPING_SPLIT_BY_LENGTH,
- TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT);
- boolean groupByCount = conf.getBoolean(
- TEZ_GROUPING_SPLIT_BY_COUNT,
- TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT);
- if (!(groupByLength || groupByCount)) {
- throw new TezUncheckedException(
- "None of the grouping parameters are true: "
- + TEZ_GROUPING_SPLIT_BY_LENGTH + ", "
- + TEZ_GROUPING_SPLIT_BY_COUNT);
- }
- LOG.info("Desired numSplits: " + desiredNumSplits +
- " lengthPerGroup: " + lengthPerGroup +
- " numLocations: " + numNodeLocations +
- " numSplitsPerLocation: " + numSplitsPerLocation +
- " numSplitsInGroup: " + numSplitsInGroup +
- " totalLength: " + totalLength +
- " numOriginalSplits: " + originalSplits.size() +
- " . Grouping by length: " + groupByLength + " count: " + groupByCount);
-
- // go through locations and group splits
- int splitsProcessed = 0;
- List<SplitHolder> group = new ArrayList<SplitHolder>(numSplitsInGroup);
- Set<String> groupLocationSet = new HashSet<String>(10);
- boolean allowSmallGroups = false;
- boolean doingRackLocal = false;
- int iterations = 0;
- while (splitsProcessed < originalSplits.size()) {
- iterations++;
- int numFullGroupsCreated = 0;
- for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
- group.clear();
- groupLocationSet.clear();
- String location = entry.getKey();
- LocationHolder holder = entry.getValue();
- SplitHolder splitHolder = holder.getUnprocessedHeadSplit();
- if (splitHolder == null) {
- // all splits on node processed
- continue;
- }
- int oldHeadIndex = holder.headIndex;
- long groupLength = 0;
- int groupNumSplits = 0;
- do {
- group.add(splitHolder);
- groupLength += estimator.getEstimatedSize(splitHolder.split);
- groupNumSplits++;
- holder.incrementHeadIndex();
- splitHolder = holder.getUnprocessedHeadSplit();
- } while(splitHolder != null
- && (!groupByLength ||
- (groupLength + estimator.getEstimatedSize(splitHolder.split) <= lengthPerGroup))
- && (!groupByCount ||
- (groupNumSplits + 1 <= numSplitsInGroup)));
+ List<InputSplit> underlyingSplits = Lists.transform(input.getWrappedSplitContainers(),
+ new Function<SplitContainer, InputSplit>() {
+ @Override
+ public InputSplit apply(SplitContainer input) {
+ return ((MapReduceSplitContainer) input).getRawSplit();
+ }
+ });
- if (holder.isEmpty()
- && !allowSmallGroups
- && (!groupByLength || groupLength < lengthPerGroup/2)
- && (!groupByCount || groupNumSplits < numSplitsInGroup/2)) {
- // group too small, reset it
- holder.headIndex = oldHeadIndex;
- continue;
- }
-
- numFullGroupsCreated++;
+ return new TezGroupedSplit(underlyingSplits, input.getWrappedInputFormatName(),
+ input.getLocations(), input.getRack(), input.getLength());
- // One split group created
- String[] groupLocation = {location};
- if (location == emptyLocation) {
- groupLocation = null;
- } else if (doingRackLocal) {
- for (SplitHolder splitH : group) {
- String[] locations = splitH.split.getLocations();
- if (locations != null) {
- for (String loc : locations) {
- if (loc != null) {
- groupLocationSet.add(loc);
- }
- }
- }
- }
- groupLocation = groupLocationSet.toArray(groupLocation);
- }
- TezGroupedSplit groupedSplit =
- new TezGroupedSplit(group.size(), wrappedInputFormatName,
- groupLocation,
- // pass rack local hint directly to AM
- ((doingRackLocal && location != emptyLocation)?location:null));
- for (SplitHolder groupedSplitHolder : group) {
- groupedSplit.addSplit(groupedSplitHolder.split);
- Preconditions.checkState(groupedSplitHolder.isProcessed == false,
- "Duplicates in grouping at location: " + location);
- groupedSplitHolder.isProcessed = true;
- splitsProcessed++;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Grouped " + group.size()
- + " length: " + groupedSplit.getLength()
- + " split at: " + location);
- }
- groupedSplits.add(groupedSplit);
- }
-
- if (!doingRackLocal && numFullGroupsCreated < 1) {
- // no node could create a node-local group. go rack-local
- doingRackLocal = true;
- // re-create locations
- int numRemainingSplits = originalSplits.size() - splitsProcessed;
- Set<InputSplit> remainingSplits = new HashSet<InputSplit>(numRemainingSplits);
- // gather remaining splits.
- for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
- LocationHolder locHolder = entry.getValue();
- while (!locHolder.isEmpty()) {
- SplitHolder splitHolder = locHolder.getUnprocessedHeadSplit();
- if (splitHolder != null) {
- remainingSplits.add(splitHolder.split);
- locHolder.incrementHeadIndex();
- }
}
- }
- if (remainingSplits.size() != numRemainingSplits) {
- throw new TezUncheckedException("Expected: " + numRemainingSplits
- + " got: " + remainingSplits.size());
- }
-
- // doing all this now instead of up front because the number of remaining
- // splits is expected to be much smaller
- RackResolver.init(conf);
- Map<String, String> locToRackMap = new HashMap<String, String>(distinctLocations.size());
- Map<String, LocationHolder> rackLocations = createLocationsMap(conf);
- for (String location : distinctLocations.keySet()) {
- String rack = emptyLocation;
- if (location != emptyLocation) {
- rack = RackResolver.resolve(location).getNetworkLocation();
- }
- locToRackMap.put(location, rack);
- if (rackLocations.get(rack) == null) {
- // splits will probably be located in all racks
- rackLocations.put(rack, new LocationHolder(numRemainingSplits));
- }
- }
- distinctLocations.clear();
- HashSet<String> rackSet = new HashSet<String>(rackLocations.size());
- int numRackSplitsToGroup = remainingSplits.size();
- for (InputSplit split : originalSplits) {
- if (numRackSplitsToGroup == 0) {
- break;
- }
- // Iterate through the original splits in their order and consider them for grouping.
- // This maintains the original ordering in the list and thus subsequent grouping will
- // maintain that order
- if (!remainingSplits.contains(split)) {
- continue;
- }
- numRackSplitsToGroup--;
- rackSet.clear();
- SplitHolder splitHolder = new SplitHolder(split);
- String[] locations = split.getLocations();
- if (locations == null || locations.length == 0) {
- locations = emptyLocations;
- }
- for (String location : locations ) {
- if (location == null) {
- location = emptyLocation;
- }
- rackSet.add(locToRackMap.get(location));
- }
- for (String rack : rackSet) {
- rackLocations.get(rack).splits.add(splitHolder);
- }
- }
-
- remainingSplits.clear();
- distinctLocations = rackLocations;
- // adjust split length to be smaller because the data is non local
- float rackSplitReduction = conf.getFloat(
- TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION,
- TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT);
- if (rackSplitReduction > 0) {
- long newLengthPerGroup = (long)(lengthPerGroup*rackSplitReduction);
- int newNumSplitsInGroup = (int) (numSplitsInGroup*rackSplitReduction);
- if (newLengthPerGroup > 0) {
- lengthPerGroup = newLengthPerGroup;
- }
- if (newNumSplitsInGroup > 0) {
- numSplitsInGroup = newNumSplitsInGroup;
- }
- }
-
- LOG.info("Doing rack local after iteration: " + iterations +
- " splitsProcessed: " + splitsProcessed +
- " numFullGroupsInRound: " + numFullGroupsCreated +
- " totalGroups: " + groupedSplits.size() +
- " lengthPerGroup: " + lengthPerGroup +
- " numSplitsInGroup: " + numSplitsInGroup);
-
- // dont do smallGroups for the first pass
- continue;
- }
-
- if (!allowSmallGroups && numFullGroupsCreated <= numNodeLocations/10) {
- // a few nodes have a lot of data or data is thinly spread across nodes
- // so allow small groups now
- allowSmallGroups = true;
- LOG.info("Allowing small groups after iteration: " + iterations +
- " splitsProcessed: " + splitsProcessed +
- " numFullGroupsInRound: " + numFullGroupsCreated +
- " totalGroups: " + groupedSplits.size());
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Iteration: " + iterations +
- " splitsProcessed: " + splitsProcessed +
- " numFullGroupsInRound: " + numFullGroupsCreated +
- " totalGroups: " + groupedSplits.size());
- }
- }
- LOG.info("Number of splits desired: " + desiredNumSplits +
- " created: " + groupedSplits.size() +
- " splitsProcessed: " + splitsProcessed);
- return groupedSplits;
+ });
}
-
+
/**
* Builder that can be used to configure grouping in Tez
- *
+ *
+ * @deprecated See {@link org.apache.tez.mapreduce.grouper.TezSplitGrouper.TezMRSplitsGrouperConfigBuilder#newConfigBuilder(Configuration)}
+ *
* @param conf
* {@link Configuration} This will be modified in place. If
* configuration values may be changed at runtime via a config file
@@ -538,10 +191,15 @@ public class TezMapReduceSplitsGrouper {
* be derived from the Configuration object.
* @return {@link org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper.TezMRSplitsGrouperConfigBuilder}
*/
+ @Deprecated
public static TezMRSplitsGrouperConfigBuilder createConfigBuilder(Configuration conf) {
return new TezMRSplitsGrouperConfigBuilder(conf);
- }
+ }
+ /**
+ * @deprecated See {@link org.apache.tez.mapreduce.grouper.TezSplitGrouper.TezMRSplitsGrouperConfigBuilder}
+ */
+ @Deprecated
public static final class TezMRSplitsGrouperConfigBuilder {
private final Configuration conf;
@@ -556,27 +214,27 @@ public class TezMapReduceSplitsGrouper {
}
public TezMRSplitsGrouperConfigBuilder setGroupSplitCount(int count) {
- this.conf.setInt(TEZ_GROUPING_SPLIT_COUNT, count);
+ this.conf.setInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_COUNT, count);
return this;
}
public TezMRSplitsGrouperConfigBuilder setGroupSplitByCount(boolean enabled) {
- this.conf.setBoolean(TEZ_GROUPING_SPLIT_BY_COUNT, enabled);
+ this.conf.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_COUNT, enabled);
return this;
}
public TezMRSplitsGrouperConfigBuilder setGroupSplitByLength(boolean enabled) {
- this.conf.setBoolean(TEZ_GROUPING_SPLIT_BY_LENGTH, enabled);
+ this.conf.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH, enabled);
return this;
}
public TezMRSplitsGrouperConfigBuilder setGroupSplitWaves(float multiplier) {
- this.conf.setFloat(TEZ_GROUPING_SPLIT_WAVES, multiplier);
+ this.conf.setFloat(TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES, multiplier);
return this;
}
public TezMRSplitsGrouperConfigBuilder setGroupingRackSplitSizeReduction(float rackSplitSizeReduction) {
- this.conf.setFloat(TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION, rackSplitSizeReduction);
+ this.conf.setFloat(TezSplitGrouper.TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION, rackSplitSizeReduction);
return this;
}
@@ -584,8 +242,8 @@ public class TezMapReduceSplitsGrouper {
* upper and lower bounds for the splits
*/
public TezMRSplitsGrouperConfigBuilder setGroupingSplitSize(long lowerBound, long upperBound) {
- this.conf.setLong(TEZ_GROUPING_SPLIT_MIN_SIZE, lowerBound);
- this.conf.setLong(TEZ_GROUPING_SPLIT_MAX_SIZE, upperBound);
+ this.conf.setLong(TezSplitGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE, lowerBound);
+ this.conf.setLong(TezSplitGrouper.TEZ_GROUPING_SPLIT_MAX_SIZE, upperBound);
return this;
}
@@ -593,5 +251,4 @@ public class TezMapReduceSplitsGrouper {
return this.conf;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index b93e4ba..ac64bf7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -23,6 +23,7 @@ import java.util.List;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
+import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -97,8 +98,8 @@ public class MRInputAMSplitGenerator extends InputInitializer {
int totalResource = getContext().getTotalAvailableResource().getMemory();
int taskResource = getContext().getVertexTaskResource().getMemory();
float waves = conf.getFloat(
- TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
- TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
+ TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES,
+ TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
int numTasks = (int)((totalResource*waves)/taskResource);
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/GroupedSplitContainer.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/GroupedSplitContainer.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/GroupedSplitContainer.java
new file mode 100644
index 0000000..c236257
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/GroupedSplitContainer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+
+/**
+ * An entity to hold grouped splits - either mapred or mapreduce.
+ */
+@InterfaceAudience.Private
+public class GroupedSplitContainer {
+
+ private final List<SplitContainer> wrappedSplits;
+ private final String wrappedInputFormatName;
+ private final String[] locations;
+ private final String rack;
+ long length = 0;
+
+ public GroupedSplitContainer(int numSplits, String wrappedInputFormatName,
+ String[] locations, String rack) {
+ this.wrappedSplits = Lists.newArrayListWithCapacity(numSplits);
+ this.wrappedInputFormatName = wrappedInputFormatName;
+ this.locations = locations;
+ this.rack = rack;
+ }
+
+
+ public void addSplit(SplitContainer splitContainer) {
+ wrappedSplits.add(splitContainer);
+ try {
+ length += splitContainer.getLength();
+ } catch (Exception e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public String getWrappedInputFormatName() {
+ return this.wrappedInputFormatName;
+ }
+
+ public List<SplitContainer> getWrappedSplitContainers() {
+ return this.wrappedSplits;
+ }
+
+ public String[] getLocations() {
+ return this.locations;
+ }
+
+ public String getRack() {
+ return this.rack;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapReduceSplitContainer.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapReduceSplitContainer.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapReduceSplitContainer.java
new file mode 100644
index 0000000..63e2138
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapReduceSplitContainer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public class MapReduceSplitContainer extends SplitContainer {
+
+ private final InputSplit inputSplit;
+
+ public MapReduceSplitContainer(InputSplit inputSplit) {
+ Preconditions.checkNotNull(inputSplit);
+ this.inputSplit = inputSplit;
+ }
+
+ @Override
+ public String[] getPreferredLocations() throws IOException, InterruptedException {
+ return inputSplit.getLocations();
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return inputSplit.getLength();
+ }
+
+ public InputSplit getRawSplit() {
+ return this.inputSplit;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MapReduceSplitContainer that = (MapReduceSplitContainer) o;
+
+ return !(inputSplit != null ? !inputSplit.equals(that.inputSplit) : that.inputSplit != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return inputSplit != null ? inputSplit.hashCode() : 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapredSplitContainer.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapredSplitContainer.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapredSplitContainer.java
new file mode 100644
index 0000000..f7dbfda
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/MapredSplitContainer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.mapred.InputSplit;
+
+public class MapredSplitContainer extends SplitContainer {
+
+ private final InputSplit inputSplit;
+
+ public MapredSplitContainer(InputSplit inputSplit) {
+ Preconditions.checkNotNull(inputSplit);
+ this.inputSplit = inputSplit;
+ }
+
+ @Override
+ public String[] getPreferredLocations() throws IOException {
+ return inputSplit.getLocations();
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return inputSplit.getLength();
+ }
+
+ public InputSplit getRawSplit() {
+ return this.inputSplit;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MapredSplitContainer that = (MapredSplitContainer) o;
+
+ return !(inputSplit != null ? !inputSplit.equals(that.inputSplit) : that.inputSplit != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return inputSplit != null ? inputSplit.hashCode() : 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitContainer.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitContainer.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitContainer.java
new file mode 100644
index 0000000..383b9ca
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitContainer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+/**
+ * Interface to represent both mapred and mapreduce splits
+ */
+public abstract class SplitContainer {
+
+ private boolean isProcessed = false;
+
+
+ public abstract String[] getPreferredLocations() throws IOException, InterruptedException;
+
+ public abstract long getLength() throws IOException, InterruptedException;
+
+ public boolean isProcessed() {
+ return isProcessed;
+ }
+
+ public void setIsProcessed(boolean val) {
+ this.isProcessed = val;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapper.java
new file mode 100644
index 0000000..ebb33ad
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+
+/**
+ * An interface to handle split size estimation across mapred and mapreduce splits
+ */
+@InterfaceAudience.Private
+public interface SplitSizeEstimatorWrapper {
+
+ long getEstimatedSize(SplitContainer splitContainer) throws IOException, InterruptedException;
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapReduce.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapReduce.java
new file mode 100644
index 0000000..df6e3c9
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapReduce.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.split.SplitSizeEstimator;
+
+public class SplitSizeEstimatorWrapperMapReduce implements SplitSizeEstimatorWrapper {
+
+ private final SplitSizeEstimator estimator;
+
+ public SplitSizeEstimatorWrapperMapReduce(SplitSizeEstimator estimator) {
+ this.estimator = estimator;
+ }
+
+ @Override
+ public long getEstimatedSize(SplitContainer rawContainer) throws IOException,
+ InterruptedException {
+ MapReduceSplitContainer splitContainer = (MapReduceSplitContainer)rawContainer;
+ return estimator.getEstimatedSize(splitContainer.getRawSplit());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/6632903b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapred.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapred.java
new file mode 100644
index 0000000..6dd3a56
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitSizeEstimatorWrapperMapred.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.split.SplitSizeEstimator;
+
+public class SplitSizeEstimatorWrapperMapred implements SplitSizeEstimatorWrapper {
+
+ private final SplitSizeEstimator estimator;
+
+ public SplitSizeEstimatorWrapperMapred(SplitSizeEstimator estimator) {
+ this.estimator = estimator;
+ }
+
+ @Override
+ public long getEstimatedSize(SplitContainer rawContainer) throws IOException,
+ InterruptedException {
+ MapredSplitContainer splitContainer = (MapredSplitContainer)rawContainer;
+ return estimator.getEstimatedSize(splitContainer.getRawSplit());
+ }
+}