You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/12/19 11:52:02 UTC
git commit: TEZ-681. Grouping generates incorrect splits if multiple
DNs run on a single node (bikas)
Updated Branches:
refs/heads/master 3d9601b83 -> cd9d4de62
TEZ-681. Grouping generates incorrect splits if multiple DNs run on a single node (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/cd9d4de6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/cd9d4de6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/cd9d4de6
Branch: refs/heads/master
Commit: cd9d4de62c7a9290a5067c4358878ca915c7a9d2
Parents: 3d9601b
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Dec 19 01:49:06 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Dec 19 01:49:06 2013 -0800
----------------------------------------------------------------------
.../split/TezGroupedSplitsInputFormat.java | 11 ++++--
.../split/TezGroupedSplitsInputFormat.java | 11 ++++--
.../hadoop/mapred/split/TestGroupedSplits.java | 37 +++++++++++++++++++-
3 files changed, 54 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cd9d4de6/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index dec8e3f..70658a0 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -229,16 +229,21 @@ public class TezGroupedSplitsInputFormat<K, V>
// allocation loop here so that we have a good initial size for the lists
for (String location : distinctLocations.keySet()) {
- distinctLocations.put(location, new LocationHolder(numSplitsPerLocation));
+ distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));
}
+ Set<String> locSet = new HashSet<String>();
for (InputSplit split : originalSplits) {
+ locSet.clear();
SplitHolder splitHolder = new SplitHolder(split);
String[] locations = split.getLocations();
if (locations == null || locations.length == 0) {
locations = emptyLocations;
}
- for (String location : locations ) {
+ for (String location : locations) {
+ locSet.add(location);
+ }
+ for (String location : locSet) {
LocationHolder holder = distinctLocations.get(location);
holder.splits.add(splitHolder);
}
@@ -333,6 +338,8 @@ public class TezGroupedSplitsInputFormat<K, V>
((doingRackLocal && location != emptyLocation)?location:null));
for (SplitHolder groupedSplitHolder : group) {
groupedSplit.addSplit(groupedSplitHolder.split);
+ Preconditions.checkState(groupedSplitHolder.isProcessed == false,
+ "Duplicates in grouping at location: " + location);
groupedSplitHolder.isProcessed = true;
splitsProcessed++;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cd9d4de6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index bb03c78..37fb705 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -212,16 +212,21 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
// allocation loop here so that we have a good initial size for the lists
for (String location : distinctLocations.keySet()) {
- distinctLocations.put(location, new LocationHolder(numSplitsPerLocation));
+ distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));
}
+ Set<String> locSet = new HashSet<String>();
for (InputSplit split : originalSplits) {
+ locSet.clear();
SplitHolder splitHolder = new SplitHolder(split);
String[] locations = split.getLocations();
if (locations == null || locations.length == 0) {
locations = emptyLocations;
}
- for (String location : locations ) {
+ for (String location : locations) {
+ locSet.add(location);
+ }
+ for (String location : locSet) {
LocationHolder holder = distinctLocations.get(location);
holder.splits.add(splitHolder);
}
@@ -316,6 +321,8 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
((doingRackLocal && location != emptyLocation)?location:null));
for (SplitHolder groupedSplitHolder : group) {
groupedSplit.addSplit(groupedSplitHolder.split);
+ Preconditions.checkState(groupedSplitHolder.isProcessed == false,
+ "Duplicates in grouping at location: " + location);
groupedSplitHolder.isProcessed = true;
splitsProcessed++;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cd9d4de6/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index b7d09e1..ab9124a 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,8 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.junit.Assert;
import org.junit.Test;
+import com.google.common.collect.Sets;
+
import static org.mockito.Mockito.*;
public class TestGroupedSplits {
@@ -285,7 +288,7 @@ public class TestGroupedSplits {
}
@SuppressWarnings({ "rawtypes", "unchecked" })
- @Test//(timeout=10000)
+ @Test(timeout=10000)
public void testGroupedSplitSize() throws IOException {
JobConf job = new JobConf(defaultConf);
InputFormat mockWrappedFormat = mock(InputFormat.class);
@@ -323,4 +326,36 @@ public class TestGroupedSplits {
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test(timeout=10000)
+ public void testGroupedSplitWithDuplicates() throws IOException {
+ JobConf job = new JobConf(defaultConf);
+ InputFormat mockWrappedFormat = mock(InputFormat.class);
+ TezGroupedSplitsInputFormat<LongWritable , Text> format =
+ new TezGroupedSplitsInputFormat<LongWritable, Text>();
+ format.setConf(job);
+ format.setInputFormat(mockWrappedFormat);
+
+ // put multiple splits with multiple copies in the same location
+ String[] locations = {"common", "common", "common"};
+ int numSplits = 3;
+ InputSplit[] mockSplits = new InputSplit[numSplits];
+ for (int i=0; i<numSplits; i++) {
+ InputSplit mockSplit = mock(InputSplit.class);
+ when(mockSplit.getLength()).thenReturn(10*1000*1000l);
+ when(mockSplit.getLocations()).thenReturn(locations);
+ mockSplits[i] = mockSplit;
+ }
+ when(mockWrappedFormat.getSplits((JobConf)anyObject(), anyInt())).thenReturn(mockSplits);
+
+ format.setDesiredNumberOfSplits(1);
+ InputSplit[] splits = format.getSplits(job, 1);
+ Assert.assertEquals(1, splits.length);
+ TezGroupedSplit split = (TezGroupedSplit) splits[0];
+ // all 3 splits are present
+ Assert.assertEquals(numSplits, split.wrappedSplits.size());
+ Set<InputSplit> splitSet = Sets.newHashSet(split.wrappedSplits);
+ Assert.assertEquals(numSplits, splitSet.size());
+ }
+
}