You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2016/06/22 01:02:03 UTC

tez git commit: TEZ-3291. Optimize splits grouping when locality information is not available (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 2d72a040b -> 1264d5d01


TEZ-3291. Optimize splits grouping when locality information is not available (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1264d5d0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1264d5d0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1264d5d0

Branch: refs/heads/master
Commit: 1264d5d019c86ae5e26240693bfb8c0a81d44dcd
Parents: 2d72a04
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Jun 21 18:01:46 2016 -0700
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue Jun 21 18:01:46 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/mapreduce/grouper/TezSplitGrouper.java  | 69 ++++++++++++--------
 .../hadoop/mapred/split/TestGroupedSplits.java  | 51 +++++++++++++++
 3 files changed, 92 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1264d5d0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d060d17..bb130ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3291. Optimize splits grouping when locality information is not available.
   TEZ-3305. TestAnalyzer fails on Hadoop 2.7.
   TEZ-3304. TestHistoryParser fails with Hadoop 2.7.
   TEZ-3216. Add support for more precise partition stats in VertexManagerEvent.

http://git-wip-us.apache.org/repos/asf/tez/blob/1264d5d0/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
index 9435e68..26e5a9e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
@@ -182,6 +182,36 @@ public abstract class TezSplitGrouper {
       locationProvider = DEFAULT_SPLIT_LOCATION_PROVIDER;
     }
 
+    List<GroupedSplitContainer> groupedSplits = null;
+    String emptyLocation = "EmptyLocation";
+    String localhost = "localhost";
+    String[] emptyLocations = {emptyLocation};
+    groupedSplits = new ArrayList<GroupedSplitContainer>(desiredNumSplits);
+
+    boolean allSplitsHaveLocalhost = true;
+
+    long totalLength = 0;
+    Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
+    // go through splits and add them to locations
+    for (SplitContainer split : originalSplits) {
+      totalLength += estimator.getEstimatedSize(split);
+      String[] locations = locationProvider.getPreferredLocations(split);
+      if (locations == null || locations.length == 0) {
+        locations = emptyLocations;
+        allSplitsHaveLocalhost = false;
+      }
+      for (String location : locations ) {
+        if (location == null) {
+          location = emptyLocation;
+          allSplitsHaveLocalhost = false;
+        }
+        if (!location.equalsIgnoreCase(localhost)) {
+          allSplitsHaveLocalhost = false;
+        }
+        distinctLocations.put(location, null);
+      }
+    }
+
     if (! (configNumSplits > 0 ||
         originalSplits.size() == 0)) {
       // numSplits has not been overridden by config
@@ -189,10 +219,6 @@ public abstract class TezSplitGrouper {
       // there are splits generated
       // desired splits is less than number of splits generated
       // Do sanity checks
-      long totalLength = 0;
-      for (SplitContainer split : originalSplits) {
-        totalLength += estimator.getEstimatedSize(split);
-      }
 
       int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.size();
       long lengthPerGroup = totalLength/splitCount;
@@ -223,19 +249,25 @@ public abstract class TezSplitGrouper {
       } else if (lengthPerGroup < minLengthPerGroup) {
         // splits too small to work. Need to override with size.
         int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;
+        /**
+         * This is a workaround for systems like S3 that pass the same
+         * fake hostname for all splits.
+         */
+        if (!allSplitsHaveLocalhost) {
+          desiredNumSplits = newDesiredNumSplits;
+        }
+
         LOG.info("Desired splits: " + desiredNumSplits + " too large. " +
             " Desired splitLength: " + lengthPerGroup +
             " Min splitLength: " + minLengthPerGroup +
             " New desired splits: " + newDesiredNumSplits +
+            " Final desired splits: " + desiredNumSplits +
+            " All splits have localhost: " + allSplitsHaveLocalhost +
             " Total length: " + totalLength +
             " Original splits: " + originalSplits.size());
-
-        desiredNumSplits = newDesiredNumSplits;
       }
     }
 
-    List<GroupedSplitContainer> groupedSplits = null;
-
     if (desiredNumSplits == 0 ||
         originalSplits.size() == 0 ||
         desiredNumSplits >= originalSplits.size()) {
@@ -253,27 +285,6 @@ public abstract class TezSplitGrouper {
       return groupedSplits;
     }
 
-    String emptyLocation = "EmptyLocation";
-    String[] emptyLocations = {emptyLocation};
-    groupedSplits = new ArrayList<GroupedSplitContainer>(desiredNumSplits);
-
-    long totalLength = 0;
-    Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
-    // go through splits and add them to locations
-    for (SplitContainer split : originalSplits) {
-      totalLength += estimator.getEstimatedSize(split);
-      String[] locations = locationProvider.getPreferredLocations(split);
-      if (locations == null || locations.length == 0) {
-        locations = emptyLocations;
-      }
-      for (String location : locations ) {
-        if (location == null) {
-          location = emptyLocation;
-        }
-        distinctLocations.put(location, null);
-      }
-    }
-
     long lengthPerGroup = totalLength/desiredNumSplits;
     int numNodeLocations = distinctLocations.size();
     int numSplitsPerLocation = originalSplits.size()/numNodeLocations;

http://git-wip-us.apache.org/repos/asf/tez/blob/1264d5d0/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index fba72a3..3dce417 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -484,6 +484,57 @@ public class TestGroupedSplits {
       }
     }
   }
+
+
+  @Test (timeout = 30000)
+  public void testS3Scenario() throws IOException {
+    //There can be multiple nodes in cluster, but locations would be "localhost" in s3
+    String[] locations = {"localhost"};
+    int oriSplits = 52;
+    int desiredSplits = 19;
+    long splitLength = 231958;
+
+    InputSplit[] origSplits = new InputSplit[oriSplits];
+
+    for (int i = 0; i < oriSplits; i++) {
+      String[] splitLoc = locations;
+      origSplits[i] = new TestInputSplit(splitLength, splitLoc, i);
+    }
+
+    TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper();
+    JobConf conf = new JobConf(defaultConf);
+    conf = (JobConf) TezSplitGrouper.newConfigBuilder(conf).build();
+
+    //Create splits now
+    InputSplit[] groupedSplits =
+        grouper.getGroupedSplits(conf, origSplits, desiredSplits, "SampleFormat");
+
+    //Verify
+    int splitsInGroup = oriSplits / desiredSplits;
+    int totalSplits = (int) Math.ceil(oriSplits * 1.0 / splitsInGroup);
+    assertEquals(totalSplits, groupedSplits.length);
+
+
+    // min split optimization should not be invoked if any location is not localhost
+    String[] nonLocalLocations = { "EmptyLocation", "localhost" };
+
+    origSplits = new InputSplit[oriSplits];
+
+    for (int i = 0; i < oriSplits; i++) {
+      String[] splitLoc = nonLocalLocations;
+      origSplits[i] = new TestInputSplit(splitLength, splitLoc, i);
+    }
+
+    grouper = new TezMapredSplitsGrouper();
+    conf = new JobConf(defaultConf);
+    conf = (JobConf) TezSplitGrouper.newConfigBuilder(conf).build();
+
+    //Create splits now
+    groupedSplits = grouper.getGroupedSplits(conf, origSplits, desiredSplits, "SampleFormat");
+
+    //splits should be 1
+    assertEquals(1, groupedSplits.length);
+  }
   
   @SuppressWarnings({ "rawtypes", "unchecked" })
   @Test(timeout=10000)