You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/11/10 00:58:15 UTC

tez git commit: TEZ-2879. While grouping splits, allow an alternate list of preferred locations to be provided per split. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master 38b39003b -> 4de112b68


TEZ-2879. While grouping splits, allow an alternate list of preferred
locations to be provided per split. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4de112b6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4de112b6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4de112b6

Branch: refs/heads/master
Commit: 4de112b689d06babdbcc2fcf31d4cf008994247a
Parents: 38b3900
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Nov 9 15:57:53 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Nov 9 15:57:53 2015 -0800

----------------------------------------------------------------------
 .../mapred/split/SplitLocationProvider.java     | 26 ++++++
 .../split/TezGroupedSplitsInputFormat.java      | 14 +++-
 .../mapred/split/TezMapredSplitsGrouper.java    | 16 +++-
 .../mapreduce/split/SplitLocationProvider.java  | 26 ++++++
 .../split/SplitLocationProviderMapReduce.java   | 39 +++++++++
 .../split/TezGroupedSplitsInputFormat.java      | 15 +++-
 .../split/TezMapReduceSplitsGrouper.java        | 13 ++-
 .../grouper/SplitLocationProviderWrapper.java   | 24 ++++++
 .../SplitLocationProviderWrapperMapred.java     | 37 +++++++++
 .../tez/mapreduce/grouper/TezSplitGrouper.java  | 28 +++++--
 .../hadoop/mapred/split/TestGroupedSplits.java  | 86 ++++++++++++++++++++
 11 files changed, 311 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitLocationProvider.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitLocationProvider.java
new file mode 100644
index 0000000..f97d9ae
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitLocationProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.split;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * Provides location information for the given split
+ */
+public interface SplitLocationProvider {
+  String[] getLocations(InputSplit split) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index b361aec..e082e3a 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.split.SplitSizeEstimator;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezException;
 
@@ -53,6 +52,7 @@ public class TezGroupedSplitsInputFormat<K, V>
   Configuration conf;
 
   SplitSizeEstimator estimator;
+  SplitLocationProvider locationProvider;
   
   public TezGroupedSplitsInputFormat() {
     
@@ -72,6 +72,14 @@ public class TezGroupedSplitsInputFormat<K, V>
       LOG.debug("Split size estimator : " + estimator);
     }
   }
+
+  public void setSplitLocationProvider(SplitLocationProvider locationProvider) {
+    Preconditions.checkArgument(locationProvider != null);
+    this.locationProvider = locationProvider;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Split size location provider: " + locationProvider);
+    }
+  }
   
   public void setDesiredNumberOfSplits(int num) {
     Preconditions.checkArgument(num >= 0);
@@ -86,7 +94,9 @@ public class TezGroupedSplitsInputFormat<K, V>
     InputSplit[] originalSplits = wrappedInputFormat.getSplits(job, numSplits);
     TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper();
     String wrappedInputFormatName = wrappedInputFormat.getClass().getName();
-    return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator);
+    return grouper
+        .getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator,
+            locationProvider);
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
index f2a8a0c..2bfccfa 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
 import org.apache.tez.mapreduce.grouper.GroupedSplitContainer;
 import org.apache.tez.mapreduce.grouper.MapredSplitContainer;
 import org.apache.tez.mapreduce.grouper.SplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitLocationProviderWrapperMapred;
 import org.apache.tez.mapreduce.grouper.SplitSizeEstimatorWrapperMapred;
 import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
 import org.slf4j.Logger;
@@ -54,8 +55,17 @@ public class TezMapredSplitsGrouper extends TezSplitGrouper {
   }
 
   public InputSplit[] getGroupedSplits(Configuration conf,
+                                       InputSplit[] originalSplits, int desiredNumSplits,
+                                       String wrappedInputFormatName,
+                                       SplitSizeEstimator estimator) throws IOException {
+    return getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName,
+        estimator, null);
+  }
+
+
+  public InputSplit[] getGroupedSplits(Configuration conf,
       InputSplit[] originalSplits, int desiredNumSplits,
-      String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException {
+      String wrappedInputFormatName, SplitSizeEstimator estimator, SplitLocationProvider locationProvider) throws IOException {
     Preconditions.checkArgument(originalSplits != null, "Splits must be specified");
 
     List<SplitContainer> originalSplitContainers = Lists.transform(Arrays.asList(originalSplits),
@@ -70,7 +80,9 @@ public class TezMapredSplitsGrouper extends TezSplitGrouper {
       List<InputSplit> resultList = Lists.transform(super
               .getGroupedSplits(conf, originalSplitContainers, desiredNumSplits,
                   wrappedInputFormatName, estimator == null ? null :
-                  new SplitSizeEstimatorWrapperMapred(estimator)),
+                      new SplitSizeEstimatorWrapperMapred(estimator),
+                  locationProvider == null ? null :
+                      new SplitLocationProviderWrapperMapred(locationProvider)),
           new Function<GroupedSplitContainer, InputSplit>() {
             @Override
             public InputSplit apply(GroupedSplitContainer input) {

http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProvider.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProvider.java
new file mode 100644
index 0000000..e4bada4
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Provides location information for the given split
+ */
+public interface SplitLocationProvider {
+  String[] getLocations(InputSplit split) throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProviderMapReduce.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProviderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProviderMapReduce.java
new file mode 100644
index 0000000..2cf76e7
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProviderMapReduce.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.mapreduce.grouper.MapReduceSplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitLocationProviderWrapper;
+
+@InterfaceAudience.Private
+public class SplitLocationProviderMapReduce implements SplitLocationProviderWrapper {
+
+  private final SplitLocationProvider locationProvider;
+
+  public SplitLocationProviderMapReduce(SplitLocationProvider locationProvider) {
+    this.locationProvider = locationProvider;
+  }
+
+  @Override
+  public String[] getPreferredLocations(SplitContainer rawContainer) throws IOException,
+      InterruptedException {
+    MapReduceSplitContainer splitContainer = (MapReduceSplitContainer) rawContainer;
+    return locationProvider.getLocations(splitContainer.getRawSplit());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index 49dc70c..5988728 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -54,6 +54,7 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
   int desiredNumSplits = 0;
   Configuration conf;
   SplitSizeEstimator estimator;
+  SplitLocationProvider locationProvider;
   
   public TezGroupedSplitsInputFormat() {
     
@@ -81,14 +82,24 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
       LOG.debug("Split size estimator : " + estimator);
     }
   }
-  
+
+  public void setSplitLocationProvider(SplitLocationProvider locationProvider) {
+    Preconditions.checkArgument(locationProvider != null);
+    this.locationProvider = locationProvider;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Split location provider : " + locationProvider);
+    }
+  }
+
   @Override
   public List<InputSplit> getSplits(JobContext context) throws IOException,
       InterruptedException {
     List<InputSplit> originalSplits = wrappedInputFormat.getSplits(context);
     TezMapReduceSplitsGrouper grouper = new TezMapReduceSplitsGrouper();
     String wrappedInputFormatName = wrappedInputFormat.getClass().getName();
-    return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator);
+    return grouper
+        .getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator,
+            locationProvider);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
index 87729bd..b36d11d 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
@@ -145,6 +145,15 @@ public class TezMapReduceSplitsGrouper extends TezSplitGrouper {
                                            String wrappedInputFormatName,
                                            SplitSizeEstimator estimator) throws IOException,
       InterruptedException {
+    return getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator, null);
+  }
+
+  public List<InputSplit> getGroupedSplits(Configuration conf,
+                                           List<InputSplit> originalSplits, int desiredNumSplits,
+                                           String wrappedInputFormatName,
+                                           SplitSizeEstimator estimator,
+                                           SplitLocationProvider locationProvider) throws IOException,
+      InterruptedException {
     Preconditions.checkArgument(originalSplits != null, "Splits must be specified");
     List<SplitContainer> originalSplitContainers = Lists.transform(originalSplits,
         new Function<InputSplit, SplitContainer>() {
@@ -158,7 +167,9 @@ public class TezMapReduceSplitsGrouper extends TezSplitGrouper {
     return Lists.transform(super
             .getGroupedSplits(conf, originalSplitContainers, desiredNumSplits,
                 wrappedInputFormatName, estimator == null ? null :
-                new SplitSizeEstimatorWrapperMapReduce(estimator)),
+                    new SplitSizeEstimatorWrapperMapReduce(estimator),
+                locationProvider == null ? null :
+                    new SplitLocationProviderMapReduce(locationProvider)),
         new Function<GroupedSplitContainer, InputSplit>() {
           @Override
           public InputSplit apply(GroupedSplitContainer input) {

http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapper.java
new file mode 100644
index 0000000..b30f174
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapper.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface SplitLocationProviderWrapper {
+  String[] getPreferredLocations(SplitContainer splitContainer) throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapperMapred.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapperMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapperMapred.java
new file mode 100644
index 0000000..89a15ba
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapperMapred.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
+
+@InterfaceAudience.Private
+public class SplitLocationProviderWrapperMapred implements SplitLocationProviderWrapper {
+
+  private final SplitLocationProvider locationProvider;
+
+  public SplitLocationProviderWrapperMapred(SplitLocationProvider locationProvider) {
+    this.locationProvider = locationProvider;
+  }
+
+  @Override
+  public String[] getPreferredLocations(SplitContainer rawContainer) throws IOException,
+      InterruptedException {
+    MapredSplitContainer splitContainer = (MapredSplitContainer)rawContainer;
+    return locationProvider.getLocations(splitContainer.getRawSplit());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
index eb616a0..848b06f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
@@ -129,6 +129,17 @@ public abstract class TezSplitGrouper {
     }
   }
 
+  private static final SplitLocationProviderWrapper DEFAULT_SPLIT_LOCATION_PROVIDER = new DefaultSplitLocationProvider();
+
+  static final class DefaultSplitLocationProvider implements SplitLocationProviderWrapper {
+
+    @Override
+    public String[] getPreferredLocations(SplitContainer splitContainer) throws IOException,
+        InterruptedException {
+      return splitContainer.getPreferredLocations();
+    }
+  }
+
   Map<String, LocationHolder> createLocationsMap(Configuration conf) {
     if (conf.getBoolean(TEZ_GROUPING_REPEATABLE,
         TEZ_GROUPING_REPEATABLE_DEFAULT)) {
@@ -141,7 +152,8 @@ public abstract class TezSplitGrouper {
                                                       List<SplitContainer> originalSplits,
                                                       int desiredNumSplits,
                                                       String wrappedInputFormatName,
-                                                      SplitSizeEstimatorWrapper estimator) throws
+                                                      SplitSizeEstimatorWrapper estimator,
+                                                      SplitLocationProviderWrapper locationProvider) throws
       IOException, InterruptedException {
     LOG.info("Grouping splits in Tez");
     Preconditions.checkArgument(originalSplits != null, "Splits must be specified");
@@ -156,6 +168,9 @@ public abstract class TezSplitGrouper {
     if (estimator == null) {
       estimator = DEFAULT_SPLIT_ESTIMATOR;
     }
+    if (locationProvider == null) {
+      locationProvider = DEFAULT_SPLIT_LOCATION_PROVIDER;
+    }
 
     if (! (configNumSplits > 0 ||
         originalSplits.size() == 0)) {
@@ -218,9 +233,10 @@ public abstract class TezSplitGrouper {
       LOG.info("Using original number of splits: " + originalSplits.size() +
           " desired splits: " + desiredNumSplits);
       groupedSplits = new ArrayList<GroupedSplitContainer>(originalSplits.size());
+      // TODO TEZ-2911 null in the non null String[] handled differently here compared to when grouping happens.
       for (SplitContainer split : originalSplits) {
         GroupedSplitContainer newSplit =
-            new GroupedSplitContainer(1, wrappedInputFormatName, split.getPreferredLocations(),
+            new GroupedSplitContainer(1, wrappedInputFormatName, locationProvider.getPreferredLocations(split),
                 null);
         newSplit.addSplit(split);
         groupedSplits.add(newSplit);
@@ -237,7 +253,7 @@ public abstract class TezSplitGrouper {
     // go through splits and add them to locations
     for (SplitContainer split : originalSplits) {
       totalLength += estimator.getEstimatedSize(split);
-      String[] locations = split.getPreferredLocations();
+      String[] locations = locationProvider.getPreferredLocations(split);
       if (locations == null || locations.length == 0) {
         locations = emptyLocations;
       }
@@ -262,7 +278,7 @@ public abstract class TezSplitGrouper {
     Set<String> locSet = new HashSet<String>();
     for (SplitContainer split : originalSplits) {
       locSet.clear();
-      String[] locations = split.getPreferredLocations();
+      String[] locations = locationProvider.getPreferredLocations(split);
       if (locations == null || locations.length == 0) {
         locations = emptyLocations;
       }
@@ -352,7 +368,7 @@ public abstract class TezSplitGrouper {
           groupLocation = null;
         } else if (doingRackLocal) {
           for (SplitContainer splitH : group) {
-            String[] locations = splitH.getPreferredLocations();
+            String[] locations = locationProvider.getPreferredLocations(splitH);
             if (locations != null) {
               for (String loc : locations) {
                 if (loc != null) {
@@ -436,7 +452,7 @@ public abstract class TezSplitGrouper {
           }
           numRackSplitsToGroup--;
           rackSet.clear();
-          String[] locations = split.getPreferredLocations();
+          String[] locations = locationProvider.getPreferredLocations(split);
           if (locations == null || locations.length == 0) {
             locations = emptyLocations;
           }

http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index eddcc42..43776f7 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -615,4 +616,89 @@ public class TestGroupedSplits {
     }
   }
 
+
+  // Splits get grouped
+  @Test (timeout = 10000)
+  public void testGroupingWithCustomLocations1() throws IOException {
+
+    int numSplits = 3;
+    InputSplit[] mockSplits = new InputSplit[numSplits];
+    InputSplit mockSplit1 = mock(InputSplit.class);
+    when(mockSplit1.getLength()).thenReturn(100*1000*1000l);
+    when(mockSplit1.getLocations()).thenReturn(new String[] {"location1", "location2"});
+    mockSplits[0] = mockSplit1;
+    InputSplit mockSplit2 = mock(InputSplit.class);
+    when(mockSplit2.getLength()).thenReturn(100*1000*1000l);
+    when(mockSplit2.getLocations()).thenReturn(new String[] {"location3", "location4"});
+    mockSplits[1] = mockSplit2;
+    InputSplit mockSplit3 = mock(InputSplit.class);
+    when(mockSplit3.getLength()).thenReturn(100*1000*1000l);
+    when(mockSplit3.getLocations()).thenReturn(new String[] {"location5", "location6"});
+    mockSplits[2] = mockSplit3;
+
+    SplitLocationProvider locationProvider = new SplitLocationProvider() {
+      @Override
+      public String[] getLocations(InputSplit split) throws IOException {
+        return new String[] {"customLocation"};
+      }
+    };
+
+    TezMapredSplitsGrouper splitsGrouper = new TezMapredSplitsGrouper();
+    InputSplit[] groupedSplits = splitsGrouper.getGroupedSplits(new Configuration(defaultConf), mockSplits, 1,
+        "MockInputForamt", null, locationProvider);
+
+    // Sanity. 1 group, with 3 splits.
+    Assert.assertEquals(1, groupedSplits.length);
+    Assert.assertTrue(groupedSplits[0] instanceof  TezGroupedSplit);
+    TezGroupedSplit groupedSplit = (TezGroupedSplit)groupedSplits[0];
+    Assert.assertEquals(3, groupedSplit.getGroupedSplits().size());
+
+    // Verify that the split ends up being grouped to the custom location.
+    Assert.assertEquals(1, groupedSplit.getLocations().length);
+    Assert.assertEquals("customLocation", groupedSplit.getLocations()[0]);
+  }
+
+  // Original splits returned.
+  @Test (timeout = 10000)
+  public void testGroupingWithCustomLocations2() throws IOException {
+
+    int numSplits = 3;
+    InputSplit[] mockSplits = new InputSplit[numSplits];
+    InputSplit mockSplit1 = mock(InputSplit.class);
+    when(mockSplit1.getLength()).thenReturn(100*1000*1000l);
+    when(mockSplit1.getLocations()).thenReturn(new String[] {"location1", "location2"});
+    mockSplits[0] = mockSplit1;
+    InputSplit mockSplit2 = mock(InputSplit.class);
+    when(mockSplit2.getLength()).thenReturn(100*1000*1000l);
+    when(mockSplit2.getLocations()).thenReturn(new String[] {"location3", "location4"});
+    mockSplits[1] = mockSplit2;
+    InputSplit mockSplit3 = mock(InputSplit.class);
+    when(mockSplit3.getLength()).thenReturn(100*1000*1000l);
+    when(mockSplit3.getLocations()).thenReturn(new String[] {"location5", "location6"});
+    mockSplits[2] = mockSplit3;
+
+    SplitLocationProvider locationProvider = new SplitLocationProvider() {
+      @Override
+      public String[] getLocations(InputSplit split) throws IOException {
+        return new String[] {"customLocation"};
+      }
+    };
+
+    TezMapredSplitsGrouper splitsGrouper = new TezMapredSplitsGrouper();
+    InputSplit[] groupedSplits = splitsGrouper.getGroupedSplits(new Configuration(defaultConf), mockSplits, 3,
+        "MockInputForamt", null, locationProvider);
+
+    // Sanity. 3 group, with 1 split each
+    Assert.assertEquals(3, groupedSplits.length);
+    for (int i = 0 ; i < 3 ; i++) {
+      Assert.assertTrue(groupedSplits[i] instanceof  TezGroupedSplit);
+      TezGroupedSplit groupedSplit = (TezGroupedSplit)groupedSplits[i];
+      Assert.assertEquals(1, groupedSplit.getGroupedSplits().size());
+
+      // Verify the splits have their final location set to customLocation
+      Assert.assertEquals(1, groupedSplit.getLocations().length);
+      Assert.assertEquals("customLocation", groupedSplit.getLocations()[0]);
+    }
+  }
+
 }