You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/11/29 03:50:27 UTC

git commit: TEZ-654. Incorrect target index calculation after auto-reduce parallelism (bikas)

Updated Branches:
  refs/heads/master c4218172f -> c046a6489


TEZ-654. Incorrect target index calculation after auto-reduce parallelism (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c046a648
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c046a648
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c046a648

Branch: refs/heads/master
Commit: c046a6489d2a8daae3c2aecd384779ba0760bd02
Parents: c421817
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Nov 28 18:12:25 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Nov 28 18:13:50 2013 -0800

----------------------------------------------------------------------
 .../dag/app/dag/impl/ShuffleVertexManager.java  | 21 +++++++++++++++-----
 .../dag/app/dag/impl/TestVertexScheduler.java   | 17 ++++++++++++++++
 .../mapreduce/client/ResourceMgrDelegate.java   |  8 +++++---
 3 files changed, 38 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c046a648/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
index 26ee64b..c83273a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -131,11 +131,17 @@ public class ShuffleVertexManager implements VertexScheduler {
         int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
       int sourceIndex = event.getSourceIndex();
       int destinationTaskIndex = sourceIndex/basePartitionRange;
+      int partitionRange = 1;
+      if(destinationTaskIndex < numDestinationTasks-1) {
+        partitionRange = basePartitionRange;
+      } else {
+        partitionRange = remainderRangeForLastShuffler;
+      }
       
       // all inputs from a source task are next to each other in original order
       int targetIndex = 
-          sourceTaskIndex * basePartitionRange 
-          + sourceIndex % basePartitionRange;
+          sourceTaskIndex * partitionRange 
+          + sourceIndex % partitionRange;
       
       event.setTargetIndex(targetIndex);
       taskIndices.add(new Integer(destinationTaskIndex));
@@ -146,10 +152,15 @@ public class ShuffleVertexManager implements VertexScheduler {
         int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
       int sourceIndex = event.getSourceIndex();
       int destinationTaskIndex = sourceIndex/basePartitionRange;
-      
+      int partitionRange = 1;
+      if(destinationTaskIndex < numDestinationTasks-1) {
+        partitionRange = basePartitionRange;
+      } else {
+        partitionRange = remainderRangeForLastShuffler;
+      }
       int targetIndex = 
-          sourceTaskIndex * basePartitionRange 
-          + sourceIndex % basePartitionRange;
+          sourceTaskIndex * partitionRange 
+          + sourceIndex % partitionRange;
       
       event.setTargetIndex(targetIndex);
       taskIndices.add(new Integer(destinationTaskIndex));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c046a648/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
index 6c0ee10..301a79b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 import org.junit.Assert;
@@ -46,6 +48,8 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.collect.Lists;
+
 import static org.mockito.Mockito.*;
 
 public class TestVertexScheduler {
@@ -223,6 +227,19 @@ public class TestVertexScheduler {
     // more completions dont cause recalculation of parallelism
     scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
     verify(mockManagedVertex).setParallelism(eq(2), anyMap());
+    Assert.assertEquals(2, newEdgeManagers.size());
+    
+    EdgeManager edgeManager = newEdgeManagers.values().iterator().next();
+    List<Integer> targets = Lists.newArrayList();
+    DataMovementEvent dmEvent = new DataMovementEvent(1, new byte[0]);
+    edgeManager.routeEventToDestinationTasks(dmEvent, 1, 2, targets);
+    Assert.assertEquals(3, dmEvent.getTargetIndex());
+    Assert.assertEquals(0, targets.get(0).intValue());
+    targets.clear();
+    dmEvent = new DataMovementEvent(2, new byte[0]);
+    edgeManager.routeEventToDestinationTasks(dmEvent, 0, 2, targets);
+    Assert.assertEquals(0, dmEvent.getTargetIndex());
+    Assert.assertEquals(1, targets.get(0).intValue());    
   }
   
   @SuppressWarnings({ "unchecked", "rawtypes" })

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c046a648/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
index d3b2c08..ac131a1 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
@@ -144,9 +144,11 @@ public class ResourceMgrDelegate {
   public QueueInfo getQueue(String queueName) throws IOException,
   InterruptedException {
     try {
-      return TypeConverter.fromYarn(
-          client.getQueueInfo(queueName), this.conf);
-    } catch (YarnException e) {
+      org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
+          client.getQueueInfo(queueName);
+      return (queueInfo == null) ? null : TypeConverter.fromYarn(queueInfo,
+          conf);
+      } catch (YarnException e) {
       throw new IOException(e);
     }
   }