You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/01/31 02:46:26 UTC
tez git commit: TEZ-2011. InputReadyVertexManager not resilient to
updates in parallelism (bikas)
Repository: tez
Updated Branches:
refs/heads/master 6ba1339d5 -> b7268699e
TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b7268699
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b7268699
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b7268699
Branch: refs/heads/master
Commit: b7268699e684fc4d1ebc00b47d8f1f7e4163bf97
Parents: 6ba1339
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Jan 30 17:46:27 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Jan 30 17:46:27 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 1 +
.../vertexmanager/InputReadyVertexManager.java | 66 ++++++++++++---
.../TestInputReadyVertexManager.java | 87 +++++++++++++++++++-
4 files changed, 140 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b7268699/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 43d009d..5c0bec0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -173,6 +173,7 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism
TEZ-1934. TestAMRecovery may fail due to the execution order is not determined.
TEZ-1642. TestAMRecovery sometimes fail.
TEZ-1931. Publish tez version info to Timeline.
http://git-wip-us.apache.org/repos/asf/tez/blob/b7268699/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f26e4ae..577c98b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1560,6 +1560,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
Preconditions.checkState(vertexToBeReconfiguredByManager, "doneReconfiguringVertex() can be "
+ "invoked only after vertexReconfigurationPlanned() is invoked");
this.vertexToBeReconfiguredByManager = false;
+ // TEZ-2015 VM may not have configured everything eg. input edge. maybeSendConfiguredEvent()
if (completelyConfiguredSent.compareAndSet(false, true)) {
// vertex already started and at that time this event was not sent. Send now.
stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName,
http://git-wip-us.apache.org/repos/asf/tez/blob/b7268699/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 11185ee..f5c187e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -18,6 +18,8 @@
package org.apache.tez.dag.library.vertexmanager;
+import java.util.Collection;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -32,11 +34,16 @@ import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
@Private
public class InputReadyVertexManager extends VertexManagerPlugin {
@@ -48,6 +55,8 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
int oneToOneSrcTasksDoneCount[];
TaskLocationHint oneToOneLocationHints[];
int numOneToOneEdges;
+ int numSignalsToWaitFor;
+ Multimap<String, Integer> pendingCompletions = LinkedListMultimap.create();
public InputReadyVertexManager(VertexManagerPluginContext context) {
super(context);
@@ -67,12 +76,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
}
}
- @Override
- public void initialize() {
- }
-
- @Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ void start() {
+ if (!ready()) {
+ return;
+ }
int numManagedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
LOG.info("Managing " + numManagedTasks + " tasks for vertex: " + getContext().getVertexName());
taskIsStarted = new boolean[numManagedTasks];
@@ -117,24 +124,61 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount];
}
- for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
+ for (Map.Entry<String, Collection<Integer>> entry : pendingCompletions.asMap().entrySet()) {
for (Integer task : entry.getValue()) {
handleSourceTaskFinished(entry.getKey(), task);
}
}
}
+
+ boolean ready() {
+ int target = getContext().getInputVertexEdgeProperties().size() + 1;
+ Preconditions.checkState(numSignalsToWaitFor <= target);
+ return (numSignalsToWaitFor == target);
+ }
+
+ @Override
+ public void initialize() {
+ Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
+ // wait for sources and self to start
+ numSignalsToWaitFor = 0;
+ for (String entry : edges.keySet()) {
+ getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED));
+ }
+ }
+
+ @Override
+ public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+ numSignalsToWaitFor++;
+ LOG.info("Received configured signal from: " + stateUpdate.getVertexName() +
+ " numConfiguredSources: " + numSignalsToWaitFor);
+ start();
+ }
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
- handleSourceTaskFinished(srcVertexName, taskId);
+ public synchronized void onVertexStarted(Map<String, List<Integer>> completions) {
+ for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
+ pendingCompletions.putAll(entry.getKey(), entry.getValue());
+ }
+ numSignalsToWaitFor++;
+ start();
+ }
+
+ @Override
+ public synchronized void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ if (ready()) {
+ handleSourceTaskFinished(srcVertexName, taskId);
+ } else {
+ pendingCompletions.put(srcVertexName, taskId);
+ }
}
@Override
- public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+ public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
}
@Override
- public void onRootVertexInitialized(String inputName,
+ public synchronized void onRootVertexInitialized(String inputName,
InputDescriptor inputDescriptor, List<Event> events) {
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b7268699/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index c6981ed..9a83a51 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -34,6 +34,8 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -55,7 +57,7 @@ public class TestInputReadyVertexManager {
}
@Test (timeout=5000)
- public void testBasicScatterGather() {
+ public void testBasicScatterGather() throws Exception {
HashMap<String, EdgeProperty> mockInputVertices =
new HashMap<String, EdgeProperty>();
String mockSrcVertexId1 = "Vertex1";
@@ -80,6 +82,9 @@ public class TestInputReadyVertexManager {
InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
manager.initialize();
+ // first source vertex configured
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ // then own vertex started
manager.onVertexStarted(initialCompletions);
manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
verify(mockContext, times(0)).scheduleVertexTasks(anyList());
@@ -89,7 +94,7 @@ public class TestInputReadyVertexManager {
}
@Test (timeout=5000)
- public void testBasicOneToOne() {
+ public void testBasicOneToOne() throws Exception {
HashMap<String, EdgeProperty> mockInputVertices =
new HashMap<String, EdgeProperty>();
String mockSrcVertexId1 = "Vertex1";
@@ -114,7 +119,10 @@ public class TestInputReadyVertexManager {
InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
manager.initialize();
+ // first own vertex started
manager.onVertexStarted(initialCompletions);
+ // then source vertex configured
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
@@ -139,9 +147,56 @@ public class TestInputReadyVertexManager {
Assert.assertEquals(2, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getTaskIndex());
}
+
+ @Test (timeout=5000)
+ public void testDelayedConfigureOneToOne() throws Exception {
+ HashMap<String, EdgeProperty> mockInputVertices =
+ new HashMap<String, EdgeProperty>();
+ String mockSrcVertexId1 = "Vertex1";
+ EdgeProperty eProp1 = EdgeProperty.create(
+ EdgeProperty.DataMovementType.ONE_TO_ONE,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("out"),
+ InputDescriptor.create("in"));
+
+ String mockManagedVertexId = "Vertex";
+
+ VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+ when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+ when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
+ mockInputVertices.put(mockSrcVertexId1, eProp1);
+
+ Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
+ initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
+
+ InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
+ manager.initialize();
+ // first own vertex started
+ manager.onVertexStarted(initialCompletions);
+ verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+ manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+ verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+ // then source vertex configured. now we start
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+ Assert.assertEquals(2, requestCaptor.getAllValues().size());
+ Assert.assertEquals(1, requestCaptor.getValue().size());
+ Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+ manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
+ verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+ Assert.assertEquals(1, requestCaptor.getValue().size());
+ Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+ Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getVertexName());
+ Assert.assertEquals(2, requestCaptor.getValue().get(0)
+ .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
+ }
@Test (timeout=5000)
- public void testComplex() {
+ public void testComplex() throws Exception {
HashMap<String, EdgeProperty> mockInputVertices =
new HashMap<String, EdgeProperty>();
String mockSrcVertexId1 = "Vertex1";
@@ -192,22 +247,43 @@ public class TestInputReadyVertexManager {
Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
- // 1-1 sources do not match managed tasks
+ // 1-1 sources do not match managed tasks before vertex started
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
manager.initialize();
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
try {
manager.onVertexStarted(initialCompletions);
Assert.assertTrue("Should have exception", false);
} catch (TezUncheckedException e) {
e.getMessage().contains("Managed task number must equal 1-1 source");
}
+
+ // 1-1 sources do not match managed tasks after vertex started
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+ manager = new InputReadyVertexManager(mockContext);
+ manager.initialize();
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStarted(initialCompletions);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+ try {
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+ Assert.assertTrue("Should have exception", false);
+ } catch (TezUncheckedException e) {
+ e.getMessage().contains("Managed task number must equal 1-1 source");
+ }
// 1-1 sources do not match
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(4);
manager = new InputReadyVertexManager(mockContext);
manager.initialize();
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
try {
manager.onVertexStarted(initialCompletions);
Assert.assertTrue("Should have exception", false);
@@ -220,6 +296,9 @@ public class TestInputReadyVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
manager = new InputReadyVertexManager(mockContext);
manager.initialize();
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
manager.onVertexStarted(initialCompletions);
// all 1-1 0's done but not scheduled because v1 is not done
manager.onSourceTaskCompleted(mockSrcVertexId3, 0);