You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/02/06 03:55:50 UTC
tez git commit: TEZ-2036. OneToOneEdgeManager should enforce that
source and destination tasks have same number (bikas) (cherry picked from
commit 5f3a9a3edb51f029bb3754b67e3b91748c993474)
Repository: tez
Updated Branches:
refs/heads/branch-0.6 a7be03981 -> 5ac89e34e
TEZ-2036. OneToOneEdgeManager should enforce that source and destination tasks have same number (bikas)
(cherry picked from commit 5f3a9a3edb51f029bb3754b67e3b91748c993474)
Conflicts: Accepted incoming change
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5ac89e34
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5ac89e34
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5ac89e34
Branch: refs/heads/branch-0.6
Commit: 5ac89e34edc6aa84726974365c16ac5a73df8670
Parents: a7be039
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Feb 5 18:50:42 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Feb 5 18:55:27 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 4 +-
.../dag/app/dag/impl/OneToOneEdgeManager.java | 8 ++++
.../apache/tez/dag/app/dag/impl/TestEdge.java | 39 +++++++++++++++++++-
3 files changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/5ac89e34/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c7d882..db88c25 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -128,7 +128,9 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
- TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo
+ TEZ-2036. OneToOneEdgeManager should enforce that source and destination
+ tasks have same number
+ TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo.
TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly
TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria
before sending notification
http://git-wip-us.apache.org/repos/asf/tez/blob/5ac89e34/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index ced481d..11a6483 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -27,6 +27,8 @@ import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import com.google.common.base.Preconditions;
+
public class OneToOneEdgeManager extends EdgeManagerPlugin {
List<Integer> destinationInputIndices =
@@ -55,6 +57,12 @@ public class OneToOneEdgeManager extends EdgeManagerPlugin {
public void routeDataMovementEventToDestination(DataMovementEvent event,
int sourceTaskIndex, int sourceOutputIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+ // by the time routing is initiated all task counts must be determined and stable
+ Preconditions.checkState(getContext().getSourceVertexNumTasks() == getContext()
+ .getDestinationVertexNumTasks(), "1-1 source and destination task counts must match."
+ + " Destination: " + getContext().getDestinationVertexName() + " tasks: "
+ + getContext().getDestinationVertexNumTasks() + " Source: "
+ + getContext().getSourceVertexName() + " tasks: " + getContext().getSourceVertexNumTasks());
destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5ac89e34/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
index 31c8064..a607f1b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
@@ -26,14 +26,18 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
@@ -51,13 +55,45 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import com.google.common.collect.Maps;
+
public class TestEdge {
+ @Test (timeout = 5000)
+ public void testOneToOneEdgeManager() {
+ EdgeManagerPluginContext mockContext = mock(EdgeManagerPluginContext.class);
+ when(mockContext.getSourceVertexName()).thenReturn("Source");
+ when(mockContext.getDestinationVertexName()).thenReturn("Destination");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+ OneToOneEdgeManager manager = new OneToOneEdgeManager(mockContext);
+ manager.initialize();
+ Map<Integer, List<Integer>> destinationTaskAndInputIndices = Maps.newHashMap();
+ DataMovementEvent event = DataMovementEvent.create(1, null);
+
+ // fail when source and destination are inconsistent
+ when(mockContext.getDestinationVertexNumTasks()).thenReturn(4);
+ try {
+ manager.routeDataMovementEventToDestination(event, 1, 1, destinationTaskAndInputIndices);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("1-1 source and destination task counts must match"));
+ }
+
+ // now make it consistent
+ when(mockContext.getDestinationVertexNumTasks()).thenReturn(3);
+ manager.routeDataMovementEventToDestination(event, 1, 1, destinationTaskAndInputIndices);
+ Assert.assertEquals(1, destinationTaskAndInputIndices.size());
+ Assert.assertEquals(1, destinationTaskAndInputIndices.entrySet().iterator().next().getKey()
+ .intValue());
+ Assert.assertEquals(0, destinationTaskAndInputIndices.entrySet().iterator().next().getValue()
+ .get(0).intValue());
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings({ "rawtypes" })
@Test (timeout = 5000)
public void testCompositeEventHandling() throws AMUserCodeException {
EventHandler eventHandler = mock(EventHandler.class);
@@ -107,7 +143,6 @@ public class TestEdge {
verifyEvents(srcTAID, destTasks);
}
- @SuppressWarnings("rawtypes")
private void verifyEvents(TezTaskAttemptID srcTAID, LinkedHashMap<TezTaskID, Task> destTasks) {
int count = 0;