You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/12/11 23:06:26 UTC

tez git commit: TEZ-2911. Null location Strings can cause problems with GroupedSplit serialization. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master 539b0e129 -> 6f7591b8d


TEZ-2911. Null location Strings can cause problems with GroupedSplit serialization. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6f7591b8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6f7591b8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6f7591b8

Branch: refs/heads/master
Commit: 6f7591b8d5e9b990c652e6dc7a7fb41dedd19179
Parents: 539b0e1
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Dec 11 14:06:09 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Dec 11 14:06:09 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/mapreduce/grouper/TezSplitGrouper.java  | 34 ++++++++++-
 .../hadoop/mapred/split/TestGroupedSplits.java  | 60 ++++++++++++++++++++
 3 files changed, 93 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6f7591b8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d02aa4f..675c2b3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
   TEZ-604. Revert temporary changes made in TEZ-603 to kill the provided tez session, if running a MapReduce job.
 
 ALL CHANGES:
+  TEZ-2911. Null location Strings can cause problems with GroupedSplit serialization.
   TEZ-2990. Change test-patch.sh to run through all tests, despite failures in upstream modules
   TEZ-2798. NPE when executing TestMemoryWithEvents::testMemoryScatterGather.
   TEZ-2963. RecoveryService#handleSummaryEvent exception with HDFS transparent encryption + kerberos authentication.

http://git-wip-us.apache.org/repos/asf/tez/blob/6f7591b8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
index 848b06f..163a2a3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
@@ -19,6 +19,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -148,6 +149,8 @@ public abstract class TezSplitGrouper {
     return new HashMap<String, LocationHolder>();
   }
 
+
+
   public List<GroupedSplitContainer> getGroupedSplits(Configuration conf,
                                                       List<SplitContainer> originalSplits,
                                                       int desiredNumSplits,
@@ -233,10 +236,9 @@ public abstract class TezSplitGrouper {
       LOG.info("Using original number of splits: " + originalSplits.size() +
           " desired splits: " + desiredNumSplits);
       groupedSplits = new ArrayList<GroupedSplitContainer>(originalSplits.size());
-      // TODO TEZ-2911 null in the non null String[] handled differently here compared to when grouping happens.
       for (SplitContainer split : originalSplits) {
         GroupedSplitContainer newSplit =
-            new GroupedSplitContainer(1, wrappedInputFormatName, locationProvider.getPreferredLocations(split),
+            new GroupedSplitContainer(1, wrappedInputFormatName, cleanupLocations(locationProvider.getPreferredLocations(split)),
                 null);
         newSplit.addSplit(split);
         groupedSplits.add(newSplit);
@@ -518,6 +520,34 @@ public abstract class TezSplitGrouper {
     return groupedSplits;
   }
 
+  private String[] cleanupLocations(String[] locations) {
+    if (locations == null || locations.length == 0) {
+      return null;
+    }
+    boolean nullLocationFound = false;
+    for (String location : locations) {
+      if (location == null) {
+        nullLocationFound = true;
+        break;
+      }
+    }
+    if (!nullLocationFound) {
+      return locations;
+    } else {
+      List<String> newLocations = new LinkedList<>();
+      for (String location : locations) {
+        if (location != null) {
+          newLocations.add(location);
+        }
+      }
+      if (newLocations.size() == 0) {
+        return null;
+      } else {
+        return newLocations.toArray(new String[newLocations.size()]);
+      }
+    }
+  }
+
   /**
    * Builder that can be used to configure grouping in Tez
    *

http://git-wip-us.apache.org/repos/asf/tez/blob/6f7591b8/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index 43776f7..140a09d 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -549,6 +549,66 @@ public class TestGroupedSplits {
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
+  // No grouping
+  @Test(timeout=10000)
+  public void testGroupedSplitWithBadLocations2() throws IOException {
+    JobConf job = new JobConf(defaultConf);
+    InputFormat mockWrappedFormat = mock(InputFormat.class);
+    TezGroupedSplitsInputFormat<LongWritable , Text> format =
+        new TezGroupedSplitsInputFormat<LongWritable, Text>();
+    format.setConf(job);
+    format.setInputFormat(mockWrappedFormat);
+
+    // put multiple splits with multiple copies in the same location
+    String validLocation = "validLocation";
+    String validLocation2 = "validLocation2";
+    int numSplits = 5;
+    InputSplit[] mockSplits = new InputSplit[numSplits];
+    InputSplit mockSplit1 = mock(InputSplit.class);
+    when(mockSplit1.getLength()).thenReturn(100*1000*1000l);
+    when(mockSplit1.getLocations()).thenReturn(null);
+    mockSplits[0] = mockSplit1;
+    InputSplit mockSplit2 = mock(InputSplit.class);
+    when(mockSplit2.getLength()).thenReturn(100*1000*1000l);
+    when(mockSplit2.getLocations()).thenReturn(new String[] {null});
+    mockSplits[1] = mockSplit2;
+    InputSplit mockSplit3 = mock(InputSplit.class);
+    when(mockSplit3.getLength()).thenReturn(100*1000*1000l);
+    when(mockSplit3.getLocations()).thenReturn(new String[] {null, null});
+    mockSplits[2] = mockSplit3;
+    InputSplit mockSplit4 = mock(InputSplit.class);
+    when(mockSplit4.getLength()).thenReturn(100*1000*1000l);
+    when(mockSplit4.getLocations()).thenReturn(new String[] {validLocation});
+    mockSplits[3] = mockSplit4;
+    InputSplit mockSplit5 = mock(InputSplit.class);
+    when(mockSplit5.getLength()).thenReturn(100*1000*1000l);
+    when(mockSplit5.getLocations()).thenReturn(new String[] {validLocation, null, validLocation2});
+    mockSplits[4] = mockSplit4;
+
+    when(mockWrappedFormat.getSplits((JobConf)anyObject(), anyInt())).thenReturn(mockSplits);
+
+    format.setDesiredNumberOfSplits(numSplits);
+    InputSplit[] splits = format.getSplits(job, 1);
+    Assert.assertEquals(numSplits, splits.length);
+    for (int i = 0 ; i < numSplits ; i++) {
+      TezGroupedSplit split = (TezGroupedSplit) splits[i];
+      // all 3 splits are present
+      Assert.assertEquals(1, split.wrappedSplits.size());
+      if (i==3) {
+        Assert.assertEquals(1, split.getLocations().length);
+        Assert.assertEquals(validLocation, split.getLocations()[0]);
+      } else if (i==4) {
+        Assert.assertEquals(1, split.getLocations().length);
+        Assert.assertTrue(split.getLocations()[0].equals(validLocation) || split.getLocations()[0].equals(validLocation2));
+      } else {
+        Assert.assertNull(split.getLocations());
+      }
+      ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+      split.write(new DataOutputStream(bOut));
+    }
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   @Test(timeout=10000)
   public void testGroupedSplitWithEstimator() throws IOException {
     JobConf job = new JobConf(defaultConf);