You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/12/20 13:17:18 UTC

git commit: TEZ-682. TezGroupedSplits fails with empty (zero length) file (bikas)

Updated Branches:
  refs/heads/master cd9d4de62 -> 27fd81e4f


TEZ-682. TezGroupedSplits fails with empty (zero length) file (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/27fd81e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/27fd81e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/27fd81e4

Branch: refs/heads/master
Commit: 27fd81e4fe1575bff6c32ad148f5cb9a71d0df20
Parents: cd9d4de
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Dec 20 04:17:05 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Dec 20 04:17:05 2013 -0800

----------------------------------------------------------------------
 .../split/TezGroupedSplitsInputFormat.java      | 19 ++++++++--
 .../split/TezGroupedSplitsInputFormat.java      | 19 ++++++++--
 .../hadoop/mapred/split/TestGroupedSplits.java  | 40 ++++++++++++++++++++
 3 files changed, 70 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/27fd81e4/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index 70658a0..d4f16a5 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -218,6 +218,9 @@ public class TezGroupedSplitsInputFormat<K, V>
         locations = emptyLocations;
       }
       for (String location : locations ) {
+        if (location == null) {
+          location = emptyLocation;
+        }
         distinctLocations.put(location, null);
       }
     }
@@ -241,6 +244,9 @@ public class TezGroupedSplitsInputFormat<K, V>
         locations = emptyLocations;
       }
       for (String location : locations) {
+        if (location == null) {
+          location = emptyLocation;
+        }
         locSet.add(location);
       }
       for (String location : locSet) {
@@ -318,18 +324,20 @@ public class TezGroupedSplitsInputFormat<K, V>
 
         // One split group created
         String[] groupLocation = {location};
-        if (doingRackLocal) {
+        if (location == emptyLocation) {
+          groupLocation = null;
+        } else if (doingRackLocal) {
           for (SplitHolder splitH : group) {
             String[] locations = splitH.split.getLocations();
             if (locations != null) {
               for (String loc : locations) {
-                groupLocationSet.add(loc);
+                if (loc != null) {
+                  groupLocationSet.add(loc);
+                }
               }
             }
           }
           groupLocation = groupLocationSet.toArray(groupLocation);
-        } else if (location == emptyLocation) {
-          groupLocation = null;
         }
         TezGroupedSplit groupedSplit = 
             new TezGroupedSplit(group.size(), wrappedInputFormatName, 
@@ -398,6 +406,9 @@ public class TezGroupedSplitsInputFormat<K, V>
             locations = emptyLocations;
           }
           for (String location : locations ) {
+            if ( location == null) {
+              location = emptyLocation;
+            }
             rackSet.add(locToRackMap.get(location));
           }
           for (String rack : rackSet) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/27fd81e4/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index 37fb705..2b17c11 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -201,6 +201,9 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
         locations = emptyLocations;
       }
       for (String location : locations ) {
+        if (location == null) {
+          location = emptyLocation;
+        }
         distinctLocations.put(location, null);
       }
     }
@@ -224,6 +227,9 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
         locations = emptyLocations;
       }
       for (String location : locations) {
+        if (location == null) {
+          location = emptyLocation;
+        }
         locSet.add(location);
       }
       for (String location : locSet) {
@@ -301,18 +307,20 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
 
         // One split group created
         String[] groupLocation = {location};
-        if (doingRackLocal) {
+        if (location == emptyLocation) {
+          groupLocation = null;
+        } else if (doingRackLocal) {
           for (SplitHolder splitH : group) {
             String[] locations = splitH.split.getLocations();
             if (locations != null) {
               for (String loc : locations) {
-                groupLocationSet.add(loc);
+                if (loc != null) {
+                  groupLocationSet.add(loc);
+                }
               }
             }
           }
           groupLocation = groupLocationSet.toArray(groupLocation);
-        } else if (location == emptyLocation) {
-          groupLocation = null;
         }
         TezGroupedSplit groupedSplit = 
             new TezGroupedSplit(group.size(), wrappedInputFormatName, 
@@ -381,6 +389,9 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
             locations = emptyLocations;
           }
           for (String location : locations ) {
+            if (location == null) {
+              location = emptyLocation;
+            }
             rackSet.add(locToRackMap.get(location));
           }
           for (String rack : rackSet) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/27fd81e4/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index ab9124a..40ce30d 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapred.split;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
@@ -357,5 +359,43 @@ public class TestGroupedSplits {
     Set<InputSplit> splitSet = Sets.newHashSet(split.wrappedSplits);
     Assert.assertEquals(numSplits, splitSet.size());
   }
+  
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test(timeout=10000)
+  public void testGroupedSplitWithBadLocations() throws IOException {
+    JobConf job = new JobConf(defaultConf);
+    InputFormat mockWrappedFormat = mock(InputFormat.class);
+    TezGroupedSplitsInputFormat<LongWritable , Text> format = 
+        new TezGroupedSplitsInputFormat<LongWritable, Text>();
+    format.setConf(job);
+    format.setInputFormat(mockWrappedFormat);
+    
+    // put multiple splits with multiple copies in the same location
+    int numSplits = 3;
+    InputSplit[] mockSplits = new InputSplit[numSplits];
+    InputSplit mockSplit1 = mock(InputSplit.class);
+    when(mockSplit1.getLength()).thenReturn(10*1000*1000l);
+    when(mockSplit1.getLocations()).thenReturn(null);
+    mockSplits[0] = mockSplit1;
+    InputSplit mockSplit2 = mock(InputSplit.class);
+    when(mockSplit2.getLength()).thenReturn(10*1000*1000l);
+    when(mockSplit2.getLocations()).thenReturn(new String[] {null});
+    mockSplits[1] = mockSplit2;
+    InputSplit mockSplit3 = mock(InputSplit.class);
+    when(mockSplit3.getLength()).thenReturn(10*1000*1000l);
+    when(mockSplit3.getLocations()).thenReturn(new String[] {null, null});
+    mockSplits[2] = mockSplit3;
+
+    when(mockWrappedFormat.getSplits((JobConf)anyObject(), anyInt())).thenReturn(mockSplits);
+    
+    format.setDesiredNumberOfSplits(1);
+    InputSplit[] splits = format.getSplits(job, 1);
+    Assert.assertEquals(1, splits.length);
+    TezGroupedSplit split = (TezGroupedSplit) splits[0];
+    // all 3 splits are present
+    Assert.assertEquals(numSplits, split.wrappedSplits.size());
+    ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+    split.write(new DataOutputStream(bOut));
+  }
 
 }