You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/12/11 23:06:26 UTC
tez git commit: TEZ-2911. Null location Strings can cause problems
with GroupedSplit serialization. (sseth)
Repository: tez
Updated Branches:
refs/heads/master 539b0e129 -> 6f7591b8d
TEZ-2911. Null location Strings can cause problems with GroupedSplit serialization. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6f7591b8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6f7591b8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6f7591b8
Branch: refs/heads/master
Commit: 6f7591b8d5e9b990c652e6dc7a7fb41dedd19179
Parents: 539b0e1
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Dec 11 14:06:09 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Dec 11 14:06:09 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/mapreduce/grouper/TezSplitGrouper.java | 34 ++++++++++-
.../hadoop/mapred/split/TestGroupedSplits.java | 60 ++++++++++++++++++++
3 files changed, 93 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/6f7591b8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d02aa4f..675c2b3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
TEZ-604. Revert temporary changes made in TEZ-603 to kill the provided tez session, if running a MapReduce job.
ALL CHANGES:
+ TEZ-2911. Null location Strings can cause problems with GroupedSplit serialization.
TEZ-2990. Change test-patch.sh to run through all tests, despite failures in upstream modules
TEZ-2798. NPE when executing TestMemoryWithEvents::testMemoryScatterGather.
TEZ-2963. RecoveryService#handleSummaryEvent exception with HDFS transparent encryption + kerberos authentication.
http://git-wip-us.apache.org/repos/asf/tez/blob/6f7591b8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
index 848b06f..163a2a3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
@@ -19,6 +19,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -148,6 +149,8 @@ public abstract class TezSplitGrouper {
return new HashMap<String, LocationHolder>();
}
+
+
public List<GroupedSplitContainer> getGroupedSplits(Configuration conf,
List<SplitContainer> originalSplits,
int desiredNumSplits,
@@ -233,10 +236,9 @@ public abstract class TezSplitGrouper {
LOG.info("Using original number of splits: " + originalSplits.size() +
" desired splits: " + desiredNumSplits);
groupedSplits = new ArrayList<GroupedSplitContainer>(originalSplits.size());
- // TODO TEZ-2911 null in the non null String[] handled differently here compared to when grouping happens.
for (SplitContainer split : originalSplits) {
GroupedSplitContainer newSplit =
- new GroupedSplitContainer(1, wrappedInputFormatName, locationProvider.getPreferredLocations(split),
+ new GroupedSplitContainer(1, wrappedInputFormatName, cleanupLocations(locationProvider.getPreferredLocations(split)),
null);
newSplit.addSplit(split);
groupedSplits.add(newSplit);
@@ -518,6 +520,34 @@ public abstract class TezSplitGrouper {
return groupedSplits;
}
+ private String[] cleanupLocations(String[] locations) {
+ if (locations == null || locations.length == 0) {
+ return null;
+ }
+ boolean nullLocationFound = false;
+ for (String location : locations) {
+ if (location == null) {
+ nullLocationFound = true;
+ break;
+ }
+ }
+ if (!nullLocationFound) {
+ return locations;
+ } else {
+ List<String> newLocations = new LinkedList<>();
+ for (String location : locations) {
+ if (location != null) {
+ newLocations.add(location);
+ }
+ }
+ if (newLocations.size() == 0) {
+ return null;
+ } else {
+ return newLocations.toArray(new String[newLocations.size()]);
+ }
+ }
+ }
+
/**
* Builder that can be used to configure grouping in Tez
*
http://git-wip-us.apache.org/repos/asf/tez/blob/6f7591b8/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index 43776f7..140a09d 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -549,6 +549,66 @@ public class TestGroupedSplits {
}
@SuppressWarnings({ "rawtypes", "unchecked" })
+ // No grouping
+ @Test(timeout=10000)
+ public void testGroupedSplitWithBadLocations2() throws IOException {
+ JobConf job = new JobConf(defaultConf);
+ InputFormat mockWrappedFormat = mock(InputFormat.class);
+ TezGroupedSplitsInputFormat<LongWritable , Text> format =
+ new TezGroupedSplitsInputFormat<LongWritable, Text>();
+ format.setConf(job);
+ format.setInputFormat(mockWrappedFormat);
+
+ // put multiple splits with multiple copies in the same location
+ String validLocation = "validLocation";
+ String validLocation2 = "validLocation2";
+ int numSplits = 5;
+ InputSplit[] mockSplits = new InputSplit[numSplits];
+ InputSplit mockSplit1 = mock(InputSplit.class);
+ when(mockSplit1.getLength()).thenReturn(100*1000*1000l);
+ when(mockSplit1.getLocations()).thenReturn(null);
+ mockSplits[0] = mockSplit1;
+ InputSplit mockSplit2 = mock(InputSplit.class);
+ when(mockSplit2.getLength()).thenReturn(100*1000*1000l);
+ when(mockSplit2.getLocations()).thenReturn(new String[] {null});
+ mockSplits[1] = mockSplit2;
+ InputSplit mockSplit3 = mock(InputSplit.class);
+ when(mockSplit3.getLength()).thenReturn(100*1000*1000l);
+ when(mockSplit3.getLocations()).thenReturn(new String[] {null, null});
+ mockSplits[2] = mockSplit3;
+ InputSplit mockSplit4 = mock(InputSplit.class);
+ when(mockSplit4.getLength()).thenReturn(100*1000*1000l);
+ when(mockSplit4.getLocations()).thenReturn(new String[] {validLocation});
+ mockSplits[3] = mockSplit4;
+ InputSplit mockSplit5 = mock(InputSplit.class);
+ when(mockSplit5.getLength()).thenReturn(100*1000*1000l);
+ when(mockSplit5.getLocations()).thenReturn(new String[] {validLocation, null, validLocation2});
+ mockSplits[4] = mockSplit4;
+
+ when(mockWrappedFormat.getSplits((JobConf)anyObject(), anyInt())).thenReturn(mockSplits);
+
+ format.setDesiredNumberOfSplits(numSplits);
+ InputSplit[] splits = format.getSplits(job, 1);
+ Assert.assertEquals(numSplits, splits.length);
+ for (int i = 0 ; i < numSplits ; i++) {
+ TezGroupedSplit split = (TezGroupedSplit) splits[i];
+ // all 3 splits are present
+ Assert.assertEquals(1, split.wrappedSplits.size());
+ if (i==3) {
+ Assert.assertEquals(1, split.getLocations().length);
+ Assert.assertEquals(validLocation, split.getLocations()[0]);
+ } else if (i==4) {
+ Assert.assertEquals(1, split.getLocations().length);
+ Assert.assertTrue(split.getLocations()[0].equals(validLocation) || split.getLocations()[0].equals(validLocation2));
+ } else {
+ Assert.assertNull(split.getLocations());
+ }
+ ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+ split.write(new DataOutputStream(bOut));
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout=10000)
public void testGroupedSplitWithEstimator() throws IOException {
JobConf job = new JobConf(defaultConf);