You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:50 UTC

[44/50] [abbrv] git commit: TEZ-1612. ShuffleVertexManager's EdgeManager should not hard code source num tasks (bikas)

TEZ-1612. ShuffleVertexManager's EdgeManager should not hard code source num tasks (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a156c6eb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a156c6eb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a156c6eb

Branch: refs/heads/branch-0.5
Commit: a156c6eb5a0c88babc221ef83ee7d9e3a173cdc7
Parents: b679052
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Sep 25 16:18:51 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Sep 25 16:18:51 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                            |  2 ++
 .../library/vertexmanager/ShuffleVertexManager.java    | 13 +++++--------
 .../src/main/proto/ShufflePayloads.proto               |  5 ++---
 .../vertexmanager/TestShuffleVertexManager.java        |  4 ++--
 4 files changed, 11 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a156c6eb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f790290..87711db 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -30,6 +30,8 @@ ALL CHANGES:
   TEZ-1611. Change DataSource/Sink to be able to supply URIs for credentials
   TEZ-1592. Vertex should wait for all initializers to finish before moving to
   INITED state
+  TEZ-1612. ShuffleVertexManager's EdgeManager should not hard code source num
+  tasks
 
 Release 0.5.1: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a156c6eb/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 2aaae16..eeb3676 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -19,7 +19,9 @@
 package org.apache.tez.dag.library.vertexmanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -166,7 +168,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       this.numDestinationTasks = config.numDestinationTasks;
       this.basePartitionRange = config.basePartitionRange;
       this.remainderRangeForLastShuffler = config.remainderRangeForLastShuffler;
-      this.numSourceTasks = config.numSourceTasks;
+      this.numSourceTasks = getContext().getSourceVertexNumTasks();
+      Preconditions.checkState(this.numDestinationTasks == getContext().getDestinationVertexNumTasks());
     }
 
     @Override
@@ -266,18 +269,15 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     int numDestinationTasks;
     int basePartitionRange;
     int remainderRangeForLastShuffler;
-    int numSourceTasks;
 
     private CustomShuffleEdgeManagerConfig(int numSourceTaskOutputs,
         int numDestinationTasks,
-        int numSourceTasks,
         int basePartitionRange,
         int remainderRangeForLastShuffler) {
       this.numSourceTaskOutputs = numSourceTaskOutputs;
       this.numDestinationTasks = numDestinationTasks;
       this.basePartitionRange = basePartitionRange;
       this.remainderRangeForLastShuffler = remainderRangeForLastShuffler;
-      this.numSourceTasks = numSourceTasks;
     }
 
     public UserPayload toUserPayload() {
@@ -287,7 +287,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
               .setNumDestinationTasks(numDestinationTasks)
               .setBasePartitionRange(basePartitionRange)
               .setRemainderRangeForLastShuffler(remainderRangeForLastShuffler)
-              .setNumSourceTasks(numSourceTasks)
               .build().toByteArray()));
     }
 
@@ -298,7 +297,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       return new CustomShuffleEdgeManagerConfig(
           proto.getNumSourceTaskOutputs(),
           proto.getNumDestinationTasks(),
-          proto.getNumSourceTasks(),
           proto.getBasePartitionRange(),
           proto.getRemainderRangeForLastShuffler());
 
@@ -464,8 +462,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
         // for the source tasks
         CustomShuffleEdgeManagerConfig edgeManagerConfig =
             new CustomShuffleEdgeManagerConfig(
-                currentParallelism, finalTaskParallelism, 
-                getContext().getVertexNumTasks(vertex), basePartitionRange,
+                currentParallelism, finalTaskParallelism, basePartitionRange,
                 ((remainderRangeForLastShuffler > 0) ?
                     remainderRangeForLastShuffler : basePartitionRange));
         EdgeManagerPluginDescriptor edgeManagerDescriptor =

http://git-wip-us.apache.org/repos/asf/tez/blob/a156c6eb/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index 2d658ba..9c711bb 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -46,7 +46,6 @@ message VertexManagerEventPayloadProto {
 message ShuffleEdgeManagerConfigPayloadProto {
   optional int32 num_source_task_outputs = 1;
   optional int32 num_destination_tasks = 2;
-  optional int32 num_source_tasks = 3;
-  optional int32 base_partition_range = 4;
-  optional int32 remainder_range_for_last_shuffler = 5;
+  optional int32 base_partition_range = 3;
+  optional int32 remainder_range_for_last_shuffler = 4;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a156c6eb/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 9ac8210..6d065fc 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -186,12 +186,12 @@ public class TestShuffleVertexManager {
 
               @Override
               public int getSourceVertexNumTasks() {
-                return 0;
+                return 2;
               }
 
               @Override
               public int getDestinationVertexNumTasks() {
-                return 0;
+                return 2;
               }
             };
             EdgeManagerPlugin edgeManager = ReflectionUtils