You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/12/20 13:17:18 UTC
git commit: TEZ-682. TezGroupedSplits fails with empty (zero length)
file (bikas)
Updated Branches:
refs/heads/master cd9d4de62 -> 27fd81e4f
TEZ-682. TezGroupedSplits fails with empty (zero length) file (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/27fd81e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/27fd81e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/27fd81e4
Branch: refs/heads/master
Commit: 27fd81e4fe1575bff6c32ad148f5cb9a71d0df20
Parents: cd9d4de
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Dec 20 04:17:05 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Dec 20 04:17:05 2013 -0800
----------------------------------------------------------------------
.../split/TezGroupedSplitsInputFormat.java | 19 ++++++++--
.../split/TezGroupedSplitsInputFormat.java | 19 ++++++++--
.../hadoop/mapred/split/TestGroupedSplits.java | 40 ++++++++++++++++++++
3 files changed, 70 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/27fd81e4/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index 70658a0..d4f16a5 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -218,6 +218,9 @@ public class TezGroupedSplitsInputFormat<K, V>
locations = emptyLocations;
}
for (String location : locations ) {
+ if (location == null) {
+ location = emptyLocation;
+ }
distinctLocations.put(location, null);
}
}
@@ -241,6 +244,9 @@ public class TezGroupedSplitsInputFormat<K, V>
locations = emptyLocations;
}
for (String location : locations) {
+ if (location == null) {
+ location = emptyLocation;
+ }
locSet.add(location);
}
for (String location : locSet) {
@@ -318,18 +324,20 @@ public class TezGroupedSplitsInputFormat<K, V>
// One split group created
String[] groupLocation = {location};
- if (doingRackLocal) {
+ if (location == emptyLocation) {
+ groupLocation = null;
+ } else if (doingRackLocal) {
for (SplitHolder splitH : group) {
String[] locations = splitH.split.getLocations();
if (locations != null) {
for (String loc : locations) {
- groupLocationSet.add(loc);
+ if (loc != null) {
+ groupLocationSet.add(loc);
+ }
}
}
}
groupLocation = groupLocationSet.toArray(groupLocation);
- } else if (location == emptyLocation) {
- groupLocation = null;
}
TezGroupedSplit groupedSplit =
new TezGroupedSplit(group.size(), wrappedInputFormatName,
@@ -398,6 +406,9 @@ public class TezGroupedSplitsInputFormat<K, V>
locations = emptyLocations;
}
for (String location : locations ) {
+ if ( location == null) {
+ location = emptyLocation;
+ }
rackSet.add(locToRackMap.get(location));
}
for (String rack : rackSet) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/27fd81e4/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index 37fb705..2b17c11 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -201,6 +201,9 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
locations = emptyLocations;
}
for (String location : locations ) {
+ if (location == null) {
+ location = emptyLocation;
+ }
distinctLocations.put(location, null);
}
}
@@ -224,6 +227,9 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
locations = emptyLocations;
}
for (String location : locations) {
+ if (location == null) {
+ location = emptyLocation;
+ }
locSet.add(location);
}
for (String location : locSet) {
@@ -301,18 +307,20 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
// One split group created
String[] groupLocation = {location};
- if (doingRackLocal) {
+ if (location == emptyLocation) {
+ groupLocation = null;
+ } else if (doingRackLocal) {
for (SplitHolder splitH : group) {
String[] locations = splitH.split.getLocations();
if (locations != null) {
for (String loc : locations) {
- groupLocationSet.add(loc);
+ if (loc != null) {
+ groupLocationSet.add(loc);
+ }
}
}
}
groupLocation = groupLocationSet.toArray(groupLocation);
- } else if (location == emptyLocation) {
- groupLocation = null;
}
TezGroupedSplit groupedSplit =
new TezGroupedSplit(group.size(), wrappedInputFormatName,
@@ -381,6 +389,9 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
locations = emptyLocations;
}
for (String location : locations ) {
+ if (location == null) {
+ location = emptyLocation;
+ }
rackSet.add(locToRackMap.get(location));
}
for (String rack : rackSet) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/27fd81e4/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index ab9124a..40ce30d 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapred.split;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
@@ -357,5 +359,43 @@ public class TestGroupedSplits {
Set<InputSplit> splitSet = Sets.newHashSet(split.wrappedSplits);
Assert.assertEquals(numSplits, splitSet.size());
}
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test(timeout=10000)
+ public void testGroupedSplitWithBadLocations() throws IOException {
+ JobConf job = new JobConf(defaultConf);
+ InputFormat mockWrappedFormat = mock(InputFormat.class);
+ TezGroupedSplitsInputFormat<LongWritable , Text> format =
+ new TezGroupedSplitsInputFormat<LongWritable, Text>();
+ format.setConf(job);
+ format.setInputFormat(mockWrappedFormat);
+
+ // put multiple splits with multiple copies in the same location
+ int numSplits = 3;
+ InputSplit[] mockSplits = new InputSplit[numSplits];
+ InputSplit mockSplit1 = mock(InputSplit.class);
+ when(mockSplit1.getLength()).thenReturn(10*1000*1000l);
+ when(mockSplit1.getLocations()).thenReturn(null);
+ mockSplits[0] = mockSplit1;
+ InputSplit mockSplit2 = mock(InputSplit.class);
+ when(mockSplit2.getLength()).thenReturn(10*1000*1000l);
+ when(mockSplit2.getLocations()).thenReturn(new String[] {null});
+ mockSplits[1] = mockSplit2;
+ InputSplit mockSplit3 = mock(InputSplit.class);
+ when(mockSplit3.getLength()).thenReturn(10*1000*1000l);
+ when(mockSplit3.getLocations()).thenReturn(new String[] {null, null});
+ mockSplits[2] = mockSplit3;
+
+ when(mockWrappedFormat.getSplits((JobConf)anyObject(), anyInt())).thenReturn(mockSplits);
+
+ format.setDesiredNumberOfSplits(1);
+ InputSplit[] splits = format.getSplits(job, 1);
+ Assert.assertEquals(1, splits.length);
+ TezGroupedSplit split = (TezGroupedSplit) splits[0];
+ // all 3 splits are present
+ Assert.assertEquals(numSplits, split.wrappedSplits.size());
+ ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+ split.write(new DataOutputStream(bOut));
+ }
}