You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/11/29 03:50:27 UTC
git commit: TEZ-654. Incorrect target index calculation after
auto-reduce parallelism (bikas)
Updated Branches:
refs/heads/master c4218172f -> c046a6489
TEZ-654. Incorrect target index calculation after auto-reduce parallelism (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c046a648
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c046a648
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c046a648
Branch: refs/heads/master
Commit: c046a6489d2a8daae3c2aecd384779ba0760bd02
Parents: c421817
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Nov 28 18:12:25 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Nov 28 18:13:50 2013 -0800
----------------------------------------------------------------------
.../dag/app/dag/impl/ShuffleVertexManager.java | 21 +++++++++++++++-----
.../dag/app/dag/impl/TestVertexScheduler.java | 17 ++++++++++++++++
.../mapreduce/client/ResourceMgrDelegate.java | 8 +++++---
3 files changed, 38 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c046a648/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
index 26ee64b..c83273a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -131,11 +131,17 @@ public class ShuffleVertexManager implements VertexScheduler {
int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
int sourceIndex = event.getSourceIndex();
int destinationTaskIndex = sourceIndex/basePartitionRange;
+ int partitionRange = 1;
+ if(destinationTaskIndex < numDestinationTasks-1) {
+ partitionRange = basePartitionRange;
+ } else {
+ partitionRange = remainderRangeForLastShuffler;
+ }
// all inputs from a source task are next to each other in original order
int targetIndex =
- sourceTaskIndex * basePartitionRange
- + sourceIndex % basePartitionRange;
+ sourceTaskIndex * partitionRange
+ + sourceIndex % partitionRange;
event.setTargetIndex(targetIndex);
taskIndices.add(new Integer(destinationTaskIndex));
@@ -146,10 +152,15 @@ public class ShuffleVertexManager implements VertexScheduler {
int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
int sourceIndex = event.getSourceIndex();
int destinationTaskIndex = sourceIndex/basePartitionRange;
-
+ int partitionRange = 1;
+ if(destinationTaskIndex < numDestinationTasks-1) {
+ partitionRange = basePartitionRange;
+ } else {
+ partitionRange = remainderRangeForLastShuffler;
+ }
int targetIndex =
- sourceTaskIndex * basePartitionRange
- + sourceIndex % basePartitionRange;
+ sourceTaskIndex * partitionRange
+ + sourceIndex % partitionRange;
event.setTargetIndex(targetIndex);
taskIndices.add(new Integer(destinationTaskIndex));
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c046a648/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
index 6c0ee10..301a79b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
import org.junit.Assert;
@@ -46,6 +48,8 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import com.google.common.collect.Lists;
+
import static org.mockito.Mockito.*;
public class TestVertexScheduler {
@@ -223,6 +227,19 @@ public class TestVertexScheduler {
// more completions dont cause recalculation of parallelism
scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
verify(mockManagedVertex).setParallelism(eq(2), anyMap());
+ Assert.assertEquals(2, newEdgeManagers.size());
+
+ EdgeManager edgeManager = newEdgeManagers.values().iterator().next();
+ List<Integer> targets = Lists.newArrayList();
+ DataMovementEvent dmEvent = new DataMovementEvent(1, new byte[0]);
+ edgeManager.routeEventToDestinationTasks(dmEvent, 1, 2, targets);
+ Assert.assertEquals(3, dmEvent.getTargetIndex());
+ Assert.assertEquals(0, targets.get(0).intValue());
+ targets.clear();
+ dmEvent = new DataMovementEvent(2, new byte[0]);
+ edgeManager.routeEventToDestinationTasks(dmEvent, 0, 2, targets);
+ Assert.assertEquals(0, dmEvent.getTargetIndex());
+ Assert.assertEquals(1, targets.get(0).intValue());
}
@SuppressWarnings({ "unchecked", "rawtypes" })
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c046a648/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
index d3b2c08..ac131a1 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
@@ -144,9 +144,11 @@ public class ResourceMgrDelegate {
public QueueInfo getQueue(String queueName) throws IOException,
InterruptedException {
try {
- return TypeConverter.fromYarn(
- client.getQueueInfo(queueName), this.conf);
- } catch (YarnException e) {
+ org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
+ client.getQueueInfo(queueName);
+ return (queueInfo == null) ? null : TypeConverter.fromYarn(queueInfo,
+ conf);
+ } catch (YarnException e) {
throw new IOException(e);
}
}