You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/02/06 03:55:50 UTC

tez git commit: TEZ-2036. OneToOneEdgeManager should enforce that source and destination tasks have same number (bikas) (cherry picked from commit 5f3a9a3edb51f029bb3754b67e3b91748c993474)

Repository: tez
Updated Branches:
  refs/heads/branch-0.6 a7be03981 -> 5ac89e34e


TEZ-2036. OneToOneEdgeManager should enforce that source and destination tasks have same number (bikas)
(cherry picked from commit 5f3a9a3edb51f029bb3754b67e3b91748c993474)

Conflicts: Accepted incoming change
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5ac89e34
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5ac89e34
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5ac89e34

Branch: refs/heads/branch-0.6
Commit: 5ac89e34edc6aa84726974365c16ac5a73df8670
Parents: a7be039
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Feb 5 18:50:42 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Feb 5 18:55:27 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 +-
 .../dag/app/dag/impl/OneToOneEdgeManager.java   |  8 ++++
 .../apache/tez/dag/app/dag/impl/TestEdge.java   | 39 +++++++++++++++++++-
 3 files changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5ac89e34/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c7d882..db88c25 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -128,7 +128,9 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
-  TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo
+  TEZ-2036. OneToOneEdgeManager should enforce that source and destination
+  tasks have same number
+  TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo.
   TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly
   TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria
   before sending notification

http://git-wip-us.apache.org/repos/asf/tez/blob/5ac89e34/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index ced481d..11a6483 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -27,6 +27,8 @@ import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
+import com.google.common.base.Preconditions;
+
 public class OneToOneEdgeManager extends EdgeManagerPlugin {
 
   List<Integer> destinationInputIndices = 
@@ -55,6 +57,12 @@ public class OneToOneEdgeManager extends EdgeManagerPlugin {
   public void routeDataMovementEventToDestination(DataMovementEvent event,
       int sourceTaskIndex, int sourceOutputIndex, 
       Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+    // by the time routing is initiated all task counts must be determined and stable
+    Preconditions.checkState(getContext().getSourceVertexNumTasks() == getContext()
+        .getDestinationVertexNumTasks(), "1-1 source and destination task counts must match."
+        + " Destination: " + getContext().getDestinationVertexName() + " tasks: "
+        + getContext().getDestinationVertexNumTasks() + " Source: "
+        + getContext().getSourceVertexName() + " tasks: " + getContext().getSourceVertexNumTasks());
     destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/5ac89e34/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
index 31c8064..a607f1b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
@@ -26,14 +26,18 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
@@ -51,13 +55,45 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import com.google.common.collect.Maps;
+
 public class TestEdge {
 
+  @Test (timeout = 5000)
+  public void testOneToOneEdgeManager() {
+    EdgeManagerPluginContext mockContext = mock(EdgeManagerPluginContext.class);
+    when(mockContext.getSourceVertexName()).thenReturn("Source");
+    when(mockContext.getDestinationVertexName()).thenReturn("Destination");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+    OneToOneEdgeManager manager = new OneToOneEdgeManager(mockContext);
+    manager.initialize();
+    Map<Integer, List<Integer>> destinationTaskAndInputIndices = Maps.newHashMap();
+    DataMovementEvent event = DataMovementEvent.create(1, null);
+
+    // fail when source and destination are inconsistent
+    when(mockContext.getDestinationVertexNumTasks()).thenReturn(4);
+    try {
+      manager.routeDataMovementEventToDestination(event, 1, 1, destinationTaskAndInputIndices);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("1-1 source and destination task counts must match"));
+    }
+    
+    // now make it consistent
+    when(mockContext.getDestinationVertexNumTasks()).thenReturn(3);
+    manager.routeDataMovementEventToDestination(event, 1, 1, destinationTaskAndInputIndices);
+    Assert.assertEquals(1, destinationTaskAndInputIndices.size());
+    Assert.assertEquals(1, destinationTaskAndInputIndices.entrySet().iterator().next().getKey()
+        .intValue());
+    Assert.assertEquals(0, destinationTaskAndInputIndices.entrySet().iterator().next().getValue()
+        .get(0).intValue());
+  }
 
-  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @SuppressWarnings({ "rawtypes" })
   @Test (timeout = 5000)
   public void testCompositeEventHandling() throws AMUserCodeException {
     EventHandler eventHandler = mock(EventHandler.class);
@@ -107,7 +143,6 @@ public class TestEdge {
     verifyEvents(srcTAID, destTasks);
   }
 
-  @SuppressWarnings("rawtypes")
   private void verifyEvents(TezTaskAttemptID srcTAID, LinkedHashMap<TezTaskID, Task> destTasks) {
     int count = 0;