You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/11/10 00:58:15 UTC
tez git commit: TEZ-2879. While grouping splits,
allow an alternate list of preferred locations to be provided per
split. (sseth)
Repository: tez
Updated Branches:
refs/heads/master 38b39003b -> 4de112b68
TEZ-2879. While grouping splits, allow an alternate list of preferred
locations to be provided per split. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4de112b6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4de112b6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4de112b6
Branch: refs/heads/master
Commit: 4de112b689d06babdbcc2fcf31d4cf008994247a
Parents: 38b3900
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Nov 9 15:57:53 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Nov 9 15:57:53 2015 -0800
----------------------------------------------------------------------
.../mapred/split/SplitLocationProvider.java | 26 ++++++
.../split/TezGroupedSplitsInputFormat.java | 14 +++-
.../mapred/split/TezMapredSplitsGrouper.java | 16 +++-
.../mapreduce/split/SplitLocationProvider.java | 26 ++++++
.../split/SplitLocationProviderMapReduce.java | 39 +++++++++
.../split/TezGroupedSplitsInputFormat.java | 15 +++-
.../split/TezMapReduceSplitsGrouper.java | 13 ++-
.../grouper/SplitLocationProviderWrapper.java | 24 ++++++
.../SplitLocationProviderWrapperMapred.java | 37 +++++++++
.../tez/mapreduce/grouper/TezSplitGrouper.java | 28 +++++--
.../hadoop/mapred/split/TestGroupedSplits.java | 86 ++++++++++++++++++++
11 files changed, 311 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitLocationProvider.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitLocationProvider.java
new file mode 100644
index 0000000..f97d9ae
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitLocationProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.split;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * Provides location information for the given split
+ */
+public interface SplitLocationProvider {
+ String[] getLocations(InputSplit split) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index b361aec..e082e3a 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.split.SplitSizeEstimator;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezException;
@@ -53,6 +52,7 @@ public class TezGroupedSplitsInputFormat<K, V>
Configuration conf;
SplitSizeEstimator estimator;
+ SplitLocationProvider locationProvider;
public TezGroupedSplitsInputFormat() {
@@ -72,6 +72,14 @@ public class TezGroupedSplitsInputFormat<K, V>
LOG.debug("Split size estimator : " + estimator);
}
}
+
+ public void setSplitLocationProvider(SplitLocationProvider locationProvider) {
+ Preconditions.checkArgument(locationProvider != null);
+ this.locationProvider = locationProvider;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Split size location provider: " + locationProvider);
+ }
+ }
public void setDesiredNumberOfSplits(int num) {
Preconditions.checkArgument(num >= 0);
@@ -86,7 +94,9 @@ public class TezGroupedSplitsInputFormat<K, V>
InputSplit[] originalSplits = wrappedInputFormat.getSplits(job, numSplits);
TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper();
String wrappedInputFormatName = wrappedInputFormat.getClass().getName();
- return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator);
+ return grouper
+ .getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator,
+ locationProvider);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
index f2a8a0c..2bfccfa 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
import org.apache.tez.mapreduce.grouper.GroupedSplitContainer;
import org.apache.tez.mapreduce.grouper.MapredSplitContainer;
import org.apache.tez.mapreduce.grouper.SplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitLocationProviderWrapperMapred;
import org.apache.tez.mapreduce.grouper.SplitSizeEstimatorWrapperMapred;
import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
import org.slf4j.Logger;
@@ -54,8 +55,17 @@ public class TezMapredSplitsGrouper extends TezSplitGrouper {
}
public InputSplit[] getGroupedSplits(Configuration conf,
+ InputSplit[] originalSplits, int desiredNumSplits,
+ String wrappedInputFormatName,
+ SplitSizeEstimator estimator) throws IOException {
+ return getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName,
+ estimator, null);
+ }
+
+
+ public InputSplit[] getGroupedSplits(Configuration conf,
InputSplit[] originalSplits, int desiredNumSplits,
- String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException {
+ String wrappedInputFormatName, SplitSizeEstimator estimator, SplitLocationProvider locationProvider) throws IOException {
Preconditions.checkArgument(originalSplits != null, "Splits must be specified");
List<SplitContainer> originalSplitContainers = Lists.transform(Arrays.asList(originalSplits),
@@ -70,7 +80,9 @@ public class TezMapredSplitsGrouper extends TezSplitGrouper {
List<InputSplit> resultList = Lists.transform(super
.getGroupedSplits(conf, originalSplitContainers, desiredNumSplits,
wrappedInputFormatName, estimator == null ? null :
- new SplitSizeEstimatorWrapperMapred(estimator)),
+ new SplitSizeEstimatorWrapperMapred(estimator),
+ locationProvider == null ? null :
+ new SplitLocationProviderWrapperMapred(locationProvider)),
new Function<GroupedSplitContainer, InputSplit>() {
@Override
public InputSplit apply(GroupedSplitContainer input) {
http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProvider.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProvider.java
new file mode 100644
index 0000000..e4bada4
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Provides location information for the given split
+ */
+public interface SplitLocationProvider {
+ String[] getLocations(InputSplit split) throws IOException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProviderMapReduce.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProviderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProviderMapReduce.java
new file mode 100644
index 0000000..2cf76e7
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProviderMapReduce.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.mapreduce.grouper.MapReduceSplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitContainer;
+import org.apache.tez.mapreduce.grouper.SplitLocationProviderWrapper;
+
+@InterfaceAudience.Private
+public class SplitLocationProviderMapReduce implements SplitLocationProviderWrapper {
+
+ private final SplitLocationProvider locationProvider;
+
+ public SplitLocationProviderMapReduce(SplitLocationProvider locationProvider) {
+ this.locationProvider = locationProvider;
+ }
+
+ @Override
+ public String[] getPreferredLocations(SplitContainer rawContainer) throws IOException,
+ InterruptedException {
+ MapReduceSplitContainer splitContainer = (MapReduceSplitContainer) rawContainer;
+ return locationProvider.getLocations(splitContainer.getRawSplit());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index 49dc70c..5988728 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -54,6 +54,7 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
int desiredNumSplits = 0;
Configuration conf;
SplitSizeEstimator estimator;
+ SplitLocationProvider locationProvider;
public TezGroupedSplitsInputFormat() {
@@ -81,14 +82,24 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
LOG.debug("Split size estimator : " + estimator);
}
}
-
+
+ public void setSplitLocationProvider(SplitLocationProvider locationProvider) {
+ Preconditions.checkArgument(locationProvider != null);
+ this.locationProvider = locationProvider;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Split location provider : " + locationProvider);
+ }
+ }
+
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException,
InterruptedException {
List<InputSplit> originalSplits = wrappedInputFormat.getSplits(context);
TezMapReduceSplitsGrouper grouper = new TezMapReduceSplitsGrouper();
String wrappedInputFormatName = wrappedInputFormat.getClass().getName();
- return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator);
+ return grouper
+ .getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator,
+ locationProvider);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
index 87729bd..b36d11d 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
@@ -145,6 +145,15 @@ public class TezMapReduceSplitsGrouper extends TezSplitGrouper {
String wrappedInputFormatName,
SplitSizeEstimator estimator) throws IOException,
InterruptedException {
+ return getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator, null);
+ }
+
+ public List<InputSplit> getGroupedSplits(Configuration conf,
+ List<InputSplit> originalSplits, int desiredNumSplits,
+ String wrappedInputFormatName,
+ SplitSizeEstimator estimator,
+ SplitLocationProvider locationProvider) throws IOException,
+ InterruptedException {
Preconditions.checkArgument(originalSplits != null, "Splits must be specified");
List<SplitContainer> originalSplitContainers = Lists.transform(originalSplits,
new Function<InputSplit, SplitContainer>() {
@@ -158,7 +167,9 @@ public class TezMapReduceSplitsGrouper extends TezSplitGrouper {
return Lists.transform(super
.getGroupedSplits(conf, originalSplitContainers, desiredNumSplits,
wrappedInputFormatName, estimator == null ? null :
- new SplitSizeEstimatorWrapperMapReduce(estimator)),
+ new SplitSizeEstimatorWrapperMapReduce(estimator),
+ locationProvider == null ? null :
+ new SplitLocationProviderMapReduce(locationProvider)),
new Function<GroupedSplitContainer, InputSplit>() {
@Override
public InputSplit apply(GroupedSplitContainer input) {
http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapper.java
new file mode 100644
index 0000000..b30f174
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapper.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface SplitLocationProviderWrapper {
+ String[] getPreferredLocations(SplitContainer splitContainer) throws IOException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapperMapred.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapperMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapperMapred.java
new file mode 100644
index 0000000..89a15ba
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapperMapred.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.grouper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
+
+@InterfaceAudience.Private
+public class SplitLocationProviderWrapperMapred implements SplitLocationProviderWrapper {
+
+ private final SplitLocationProvider locationProvider;
+
+ public SplitLocationProviderWrapperMapred(SplitLocationProvider locationProvider) {
+ this.locationProvider = locationProvider;
+ }
+
+ @Override
+ public String[] getPreferredLocations(SplitContainer rawContainer) throws IOException,
+ InterruptedException {
+ MapredSplitContainer splitContainer = (MapredSplitContainer)rawContainer;
+ return locationProvider.getLocations(splitContainer.getRawSplit());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
index eb616a0..848b06f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
@@ -129,6 +129,17 @@ public abstract class TezSplitGrouper {
}
}
+ private static final SplitLocationProviderWrapper DEFAULT_SPLIT_LOCATION_PROVIDER = new DefaultSplitLocationProvider();
+
+ static final class DefaultSplitLocationProvider implements SplitLocationProviderWrapper {
+
+ @Override
+ public String[] getPreferredLocations(SplitContainer splitContainer) throws IOException,
+ InterruptedException {
+ return splitContainer.getPreferredLocations();
+ }
+ }
+
Map<String, LocationHolder> createLocationsMap(Configuration conf) {
if (conf.getBoolean(TEZ_GROUPING_REPEATABLE,
TEZ_GROUPING_REPEATABLE_DEFAULT)) {
@@ -141,7 +152,8 @@ public abstract class TezSplitGrouper {
List<SplitContainer> originalSplits,
int desiredNumSplits,
String wrappedInputFormatName,
- SplitSizeEstimatorWrapper estimator) throws
+ SplitSizeEstimatorWrapper estimator,
+ SplitLocationProviderWrapper locationProvider) throws
IOException, InterruptedException {
LOG.info("Grouping splits in Tez");
Preconditions.checkArgument(originalSplits != null, "Splits must be specified");
@@ -156,6 +168,9 @@ public abstract class TezSplitGrouper {
if (estimator == null) {
estimator = DEFAULT_SPLIT_ESTIMATOR;
}
+ if (locationProvider == null) {
+ locationProvider = DEFAULT_SPLIT_LOCATION_PROVIDER;
+ }
if (! (configNumSplits > 0 ||
originalSplits.size() == 0)) {
@@ -218,9 +233,10 @@ public abstract class TezSplitGrouper {
LOG.info("Using original number of splits: " + originalSplits.size() +
" desired splits: " + desiredNumSplits);
groupedSplits = new ArrayList<GroupedSplitContainer>(originalSplits.size());
+ // TODO TEZ-2911 null in the non null String[] handled differently here compared to when grouping happens.
for (SplitContainer split : originalSplits) {
GroupedSplitContainer newSplit =
- new GroupedSplitContainer(1, wrappedInputFormatName, split.getPreferredLocations(),
+ new GroupedSplitContainer(1, wrappedInputFormatName, locationProvider.getPreferredLocations(split),
null);
newSplit.addSplit(split);
groupedSplits.add(newSplit);
@@ -237,7 +253,7 @@ public abstract class TezSplitGrouper {
// go through splits and add them to locations
for (SplitContainer split : originalSplits) {
totalLength += estimator.getEstimatedSize(split);
- String[] locations = split.getPreferredLocations();
+ String[] locations = locationProvider.getPreferredLocations(split);
if (locations == null || locations.length == 0) {
locations = emptyLocations;
}
@@ -262,7 +278,7 @@ public abstract class TezSplitGrouper {
Set<String> locSet = new HashSet<String>();
for (SplitContainer split : originalSplits) {
locSet.clear();
- String[] locations = split.getPreferredLocations();
+ String[] locations = locationProvider.getPreferredLocations(split);
if (locations == null || locations.length == 0) {
locations = emptyLocations;
}
@@ -352,7 +368,7 @@ public abstract class TezSplitGrouper {
groupLocation = null;
} else if (doingRackLocal) {
for (SplitContainer splitH : group) {
- String[] locations = splitH.getPreferredLocations();
+ String[] locations = locationProvider.getPreferredLocations(splitH);
if (locations != null) {
for (String loc : locations) {
if (loc != null) {
@@ -436,7 +452,7 @@ public abstract class TezSplitGrouper {
}
numRackSplitsToGroup--;
rackSet.clear();
- String[] locations = split.getPreferredLocations();
+ String[] locations = locationProvider.getPreferredLocations(split);
if (locations == null || locations.length == 0) {
locations = emptyLocations;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index eddcc42..43776f7 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -615,4 +616,89 @@ public class TestGroupedSplits {
}
}
+
+ // Splits get grouped
+ @Test (timeout = 10000)
+ public void testGroupingWithCustomLocations1() throws IOException {
+
+ int numSplits = 3;
+ InputSplit[] mockSplits = new InputSplit[numSplits];
+ InputSplit mockSplit1 = mock(InputSplit.class);
+ when(mockSplit1.getLength()).thenReturn(100*1000*1000l);
+ when(mockSplit1.getLocations()).thenReturn(new String[] {"location1", "location2"});
+ mockSplits[0] = mockSplit1;
+ InputSplit mockSplit2 = mock(InputSplit.class);
+ when(mockSplit2.getLength()).thenReturn(100*1000*1000l);
+ when(mockSplit2.getLocations()).thenReturn(new String[] {"location3", "location4"});
+ mockSplits[1] = mockSplit2;
+ InputSplit mockSplit3 = mock(InputSplit.class);
+ when(mockSplit3.getLength()).thenReturn(100*1000*1000l);
+ when(mockSplit3.getLocations()).thenReturn(new String[] {"location5", "location6"});
+ mockSplits[2] = mockSplit3;
+
+ SplitLocationProvider locationProvider = new SplitLocationProvider() {
+ @Override
+ public String[] getLocations(InputSplit split) throws IOException {
+ return new String[] {"customLocation"};
+ }
+ };
+
+ TezMapredSplitsGrouper splitsGrouper = new TezMapredSplitsGrouper();
+ InputSplit[] groupedSplits = splitsGrouper.getGroupedSplits(new Configuration(defaultConf), mockSplits, 1,
+ "MockInputForamt", null, locationProvider);
+
+ // Sanity. 1 group, with 3 splits.
+ Assert.assertEquals(1, groupedSplits.length);
+ Assert.assertTrue(groupedSplits[0] instanceof TezGroupedSplit);
+ TezGroupedSplit groupedSplit = (TezGroupedSplit)groupedSplits[0];
+ Assert.assertEquals(3, groupedSplit.getGroupedSplits().size());
+
+ // Verify that the split ends up being grouped to the custom location.
+ Assert.assertEquals(1, groupedSplit.getLocations().length);
+ Assert.assertEquals("customLocation", groupedSplit.getLocations()[0]);
+ }
+
+ // Original splits returned.
+ @Test (timeout = 10000)
+ public void testGroupingWithCustomLocations2() throws IOException {
+
+ int numSplits = 3;
+ InputSplit[] mockSplits = new InputSplit[numSplits];
+ InputSplit mockSplit1 = mock(InputSplit.class);
+ when(mockSplit1.getLength()).thenReturn(100*1000*1000l);
+ when(mockSplit1.getLocations()).thenReturn(new String[] {"location1", "location2"});
+ mockSplits[0] = mockSplit1;
+ InputSplit mockSplit2 = mock(InputSplit.class);
+ when(mockSplit2.getLength()).thenReturn(100*1000*1000l);
+ when(mockSplit2.getLocations()).thenReturn(new String[] {"location3", "location4"});
+ mockSplits[1] = mockSplit2;
+ InputSplit mockSplit3 = mock(InputSplit.class);
+ when(mockSplit3.getLength()).thenReturn(100*1000*1000l);
+ when(mockSplit3.getLocations()).thenReturn(new String[] {"location5", "location6"});
+ mockSplits[2] = mockSplit3;
+
+ SplitLocationProvider locationProvider = new SplitLocationProvider() {
+ @Override
+ public String[] getLocations(InputSplit split) throws IOException {
+ return new String[] {"customLocation"};
+ }
+ };
+
+ TezMapredSplitsGrouper splitsGrouper = new TezMapredSplitsGrouper();
+ InputSplit[] groupedSplits = splitsGrouper.getGroupedSplits(new Configuration(defaultConf), mockSplits, 3,
+ "MockInputForamt", null, locationProvider);
+
+ // Sanity. 3 group, with 1 split each
+ Assert.assertEquals(3, groupedSplits.length);
+ for (int i = 0 ; i < 3 ; i++) {
+ Assert.assertTrue(groupedSplits[i] instanceof TezGroupedSplit);
+ TezGroupedSplit groupedSplit = (TezGroupedSplit)groupedSplits[i];
+ Assert.assertEquals(1, groupedSplit.getGroupedSplits().size());
+
+ // Verify the splits have their final location set to customLocation
+ Assert.assertEquals(1, groupedSplit.getLocations().length);
+ Assert.assertEquals("customLocation", groupedSplit.getLocations()[0]);
+ }
+ }
+
}