You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/12/19 11:52:02 UTC

git commit: TEZ-681. Grouping generates incorrect splits if multiple DNs run on a single node (bikas)

Updated Branches:
  refs/heads/master 3d9601b83 -> cd9d4de62


TEZ-681. Grouping generates incorrect splits if multiple DNs run on a single node (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/cd9d4de6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/cd9d4de6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/cd9d4de6

Branch: refs/heads/master
Commit: cd9d4de62c7a9290a5067c4358878ca915c7a9d2
Parents: 3d9601b
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Dec 19 01:49:06 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Dec 19 01:49:06 2013 -0800

----------------------------------------------------------------------
 .../split/TezGroupedSplitsInputFormat.java      | 11 ++++--
 .../split/TezGroupedSplitsInputFormat.java      | 11 ++++--
 .../hadoop/mapred/split/TestGroupedSplits.java  | 37 +++++++++++++++++++-
 3 files changed, 54 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cd9d4de6/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index dec8e3f..70658a0 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -229,16 +229,21 @@ public class TezGroupedSplitsInputFormat<K, V>
 
     // allocation loop here so that we have a good initial size for the lists
     for (String location : distinctLocations.keySet()) {
-      distinctLocations.put(location, new LocationHolder(numSplitsPerLocation));
+      distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));
     }
     
+    Set<String> locSet = new HashSet<String>();
     for (InputSplit split : originalSplits) {
+      locSet.clear();
       SplitHolder splitHolder = new SplitHolder(split);
       String[] locations = split.getLocations();
       if (locations == null || locations.length == 0) {
         locations = emptyLocations;
       }
-      for (String location : locations ) {
+      for (String location : locations) {
+        locSet.add(location);
+      }
+      for (String location : locSet) {
         LocationHolder holder = distinctLocations.get(location);
         holder.splits.add(splitHolder);
       }
@@ -333,6 +338,8 @@ public class TezGroupedSplitsInputFormat<K, V>
                 ((doingRackLocal && location != emptyLocation)?location:null));
         for (SplitHolder groupedSplitHolder : group) {
           groupedSplit.addSplit(groupedSplitHolder.split);
+          Preconditions.checkState(groupedSplitHolder.isProcessed == false, 
+              "Duplicates in grouping at location: " + location);
           groupedSplitHolder.isProcessed = true;
           splitsProcessed++;
         }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cd9d4de6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index bb03c78..37fb705 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -212,16 +212,21 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
     
     // allocation loop here so that we have a good initial size for the lists
     for (String location : distinctLocations.keySet()) {
-      distinctLocations.put(location, new LocationHolder(numSplitsPerLocation));
+      distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));
     }
     
+    Set<String> locSet = new HashSet<String>();
     for (InputSplit split : originalSplits) {
+      locSet.clear();
       SplitHolder splitHolder = new SplitHolder(split);
       String[] locations = split.getLocations();
       if (locations == null || locations.length == 0) {
         locations = emptyLocations;
       }
-      for (String location : locations ) {
+      for (String location : locations) {
+        locSet.add(location);
+      }
+      for (String location : locSet) {
         LocationHolder holder = distinctLocations.get(location);
         holder.splits.add(splitHolder);
       }
@@ -316,6 +321,8 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
                 ((doingRackLocal && location != emptyLocation)?location:null));
         for (SplitHolder groupedSplitHolder : group) {
           groupedSplit.addSplit(groupedSplitHolder.split);
+          Preconditions.checkState(groupedSplitHolder.isProcessed == false,
+              "Duplicates in grouping at location: " + location);
           groupedSplitHolder.isProcessed = true;
           splitsProcessed++;
         }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cd9d4de6/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index b7d09e1..ab9124a 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,8 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+
 import static org.mockito.Mockito.*;
 
 public class TestGroupedSplits {
@@ -285,7 +288,7 @@ public class TestGroupedSplits {
   }  
   
   @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test//(timeout=10000)
+  @Test(timeout=10000)
   public void testGroupedSplitSize() throws IOException {
     JobConf job = new JobConf(defaultConf);
     InputFormat mockWrappedFormat = mock(InputFormat.class);
@@ -323,4 +326,36 @@ public class TestGroupedSplits {
     
   }
 
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test(timeout=10000)
+  public void testGroupedSplitWithDuplicates() throws IOException {
+    JobConf job = new JobConf(defaultConf);
+    InputFormat mockWrappedFormat = mock(InputFormat.class);
+    TezGroupedSplitsInputFormat<LongWritable , Text> format = 
+        new TezGroupedSplitsInputFormat<LongWritable, Text>();
+    format.setConf(job);
+    format.setInputFormat(mockWrappedFormat);
+    
+    // put multiple splits with multiple copies in the same location
+    String[] locations = {"common", "common", "common"};
+    int numSplits = 3;
+    InputSplit[] mockSplits = new InputSplit[numSplits];
+    for (int i=0; i<numSplits; i++) {
+      InputSplit mockSplit = mock(InputSplit.class);
+      when(mockSplit.getLength()).thenReturn(10*1000*1000l);
+      when(mockSplit.getLocations()).thenReturn(locations);
+      mockSplits[i] = mockSplit;
+    }
+    when(mockWrappedFormat.getSplits((JobConf)anyObject(), anyInt())).thenReturn(mockSplits);
+    
+    format.setDesiredNumberOfSplits(1);
+    InputSplit[] splits = format.getSplits(job, 1);
+    Assert.assertEquals(1, splits.length);
+    TezGroupedSplit split = (TezGroupedSplit) splits[0];
+    // all 3 splits are present
+    Assert.assertEquals(numSplits, split.wrappedSplits.size());
+    Set<InputSplit> splitSet = Sets.newHashSet(split.wrappedSplits);
+    Assert.assertEquals(numSplits, splitSet.size());
+  }
+
 }