You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2016/06/22 01:02:03 UTC
tez git commit: TEZ-3291. Optimize splits grouping when locality
information is not available (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 2d72a040b -> 1264d5d01
TEZ-3291. Optimize splits grouping when locality information is not available (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1264d5d0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1264d5d0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1264d5d0
Branch: refs/heads/master
Commit: 1264d5d019c86ae5e26240693bfb8c0a81d44dcd
Parents: 2d72a04
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Jun 21 18:01:46 2016 -0700
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue Jun 21 18:01:46 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/mapreduce/grouper/TezSplitGrouper.java | 69 ++++++++++++--------
.../hadoop/mapred/split/TestGroupedSplits.java | 51 +++++++++++++++
3 files changed, 92 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/1264d5d0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d060d17..bb130ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3291. Optimize splits grouping when locality information is not available.
TEZ-3305. TestAnalyzer fails on Hadoop 2.7.
TEZ-3304. TestHistoryParser fails with Hadoop 2.7.
TEZ-3216. Add support for more precise partition stats in VertexManagerEvent.
http://git-wip-us.apache.org/repos/asf/tez/blob/1264d5d0/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
index 9435e68..26e5a9e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
@@ -182,6 +182,36 @@ public abstract class TezSplitGrouper {
locationProvider = DEFAULT_SPLIT_LOCATION_PROVIDER;
}
+ List<GroupedSplitContainer> groupedSplits = null;
+ String emptyLocation = "EmptyLocation";
+ String localhost = "localhost";
+ String[] emptyLocations = {emptyLocation};
+ groupedSplits = new ArrayList<GroupedSplitContainer>(desiredNumSplits);
+
+ boolean allSplitsHaveLocalhost = true;
+
+ long totalLength = 0;
+ Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
+ // go through splits and add them to locations
+ for (SplitContainer split : originalSplits) {
+ totalLength += estimator.getEstimatedSize(split);
+ String[] locations = locationProvider.getPreferredLocations(split);
+ if (locations == null || locations.length == 0) {
+ locations = emptyLocations;
+ allSplitsHaveLocalhost = false;
+ }
+ for (String location : locations ) {
+ if (location == null) {
+ location = emptyLocation;
+ allSplitsHaveLocalhost = false;
+ }
+ if (!location.equalsIgnoreCase(localhost)) {
+ allSplitsHaveLocalhost = false;
+ }
+ distinctLocations.put(location, null);
+ }
+ }
+
if (! (configNumSplits > 0 ||
originalSplits.size() == 0)) {
// numSplits has not been overridden by config
@@ -189,10 +219,6 @@ public abstract class TezSplitGrouper {
// there are splits generated
// desired splits is less than number of splits generated
// Do sanity checks
- long totalLength = 0;
- for (SplitContainer split : originalSplits) {
- totalLength += estimator.getEstimatedSize(split);
- }
int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.size();
long lengthPerGroup = totalLength/splitCount;
@@ -223,19 +249,25 @@ public abstract class TezSplitGrouper {
} else if (lengthPerGroup < minLengthPerGroup) {
// splits too small to work. Need to override with size.
int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;
+ /**
+ * This is a workaround for systems like S3 that pass the same
+ * fake hostname for all splits.
+ */
+ if (!allSplitsHaveLocalhost) {
+ desiredNumSplits = newDesiredNumSplits;
+ }
+
LOG.info("Desired splits: " + desiredNumSplits + " too large. " +
" Desired splitLength: " + lengthPerGroup +
" Min splitLength: " + minLengthPerGroup +
" New desired splits: " + newDesiredNumSplits +
+ " Final desired splits: " + desiredNumSplits +
+ " All splits have localhost: " + allSplitsHaveLocalhost +
" Total length: " + totalLength +
" Original splits: " + originalSplits.size());
-
- desiredNumSplits = newDesiredNumSplits;
}
}
- List<GroupedSplitContainer> groupedSplits = null;
-
if (desiredNumSplits == 0 ||
originalSplits.size() == 0 ||
desiredNumSplits >= originalSplits.size()) {
@@ -253,27 +285,6 @@ public abstract class TezSplitGrouper {
return groupedSplits;
}
- String emptyLocation = "EmptyLocation";
- String[] emptyLocations = {emptyLocation};
- groupedSplits = new ArrayList<GroupedSplitContainer>(desiredNumSplits);
-
- long totalLength = 0;
- Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
- // go through splits and add them to locations
- for (SplitContainer split : originalSplits) {
- totalLength += estimator.getEstimatedSize(split);
- String[] locations = locationProvider.getPreferredLocations(split);
- if (locations == null || locations.length == 0) {
- locations = emptyLocations;
- }
- for (String location : locations ) {
- if (location == null) {
- location = emptyLocation;
- }
- distinctLocations.put(location, null);
- }
- }
-
long lengthPerGroup = totalLength/desiredNumSplits;
int numNodeLocations = distinctLocations.size();
int numSplitsPerLocation = originalSplits.size()/numNodeLocations;
http://git-wip-us.apache.org/repos/asf/tez/blob/1264d5d0/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index fba72a3..3dce417 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -484,6 +484,57 @@ public class TestGroupedSplits {
}
}
}
+
+
+ @Test (timeout = 30000)
+ public void testS3Scenario() throws IOException {
+ //There can be multiple nodes in cluster, but locations would be "localhost" in s3
+ String[] locations = {"localhost"};
+ int oriSplits = 52;
+ int desiredSplits = 19;
+ long splitLength = 231958;
+
+ InputSplit[] origSplits = new InputSplit[oriSplits];
+
+ for (int i = 0; i < oriSplits; i++) {
+ String[] splitLoc = locations;
+ origSplits[i] = new TestInputSplit(splitLength, splitLoc, i);
+ }
+
+ TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper();
+ JobConf conf = new JobConf(defaultConf);
+ conf = (JobConf) TezSplitGrouper.newConfigBuilder(conf).build();
+
+ //Create splits now
+ InputSplit[] groupedSplits =
+ grouper.getGroupedSplits(conf, origSplits, desiredSplits, "SampleFormat");
+
+ //Verify
+ int splitsInGroup = oriSplits / desiredSplits;
+ int totalSplits = (int) Math.ceil(oriSplits * 1.0 / splitsInGroup);
+ assertEquals(totalSplits, groupedSplits.length);
+
+
+ // min split optimization should not be invoked if any location is not localhost
+ String[] nonLocalLocations = { "EmptyLocation", "localhost" };
+
+ origSplits = new InputSplit[oriSplits];
+
+ for (int i = 0; i < oriSplits; i++) {
+ String[] splitLoc = nonLocalLocations;
+ origSplits[i] = new TestInputSplit(splitLength, splitLoc, i);
+ }
+
+ grouper = new TezMapredSplitsGrouper();
+ conf = new JobConf(defaultConf);
+ conf = (JobConf) TezSplitGrouper.newConfigBuilder(conf).build();
+
+ //Create splits now
+ groupedSplits = grouper.getGroupedSplits(conf, origSplits, desiredSplits, "SampleFormat");
+
+ //splits should be 1
+ assertEquals(1, groupedSplits.length);
+ }
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout=10000)