You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/02/04 20:13:40 UTC
tez git commit: TEZ-2015. VertexImpl.doneReconfiguringVertex() should
check other criteria before sending notification (bikas) (cherry picked from
commit 7096d8a8f409e3db4f568230a959432e2c9cbb78)
Repository: tez
Updated Branches:
refs/heads/branch-0.6 8cdd988b2 -> fbd240f4e
TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria before sending notification (bikas)
(cherry picked from commit 7096d8a8f409e3db4f568230a959432e2c9cbb78)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fbd240f4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fbd240f4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fbd240f4
Branch: refs/heads/branch-0.6
Commit: fbd240f4e08cd47f842a2dd98106835ff5f1954d
Parents: 8cdd988
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 4 11:10:23 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 4 11:12:47 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 12 ++--
.../tez/dag/app/dag/impl/TestVertexImpl.java | 72 ++++++++++++++++++++
3 files changed, 80 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fbd240f4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 796ec9a..d369d38 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -125,6 +125,8 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria
+ before sending notification
TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism
TEZ-1934. TestAMRecovery may fail due to the execution order is not determined.
TEZ-1642. TestAMRecovery sometimes fail.
http://git-wip-us.apache.org/repos/asf/tez/blob/fbd240f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a467e4b..506bbc1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1560,11 +1560,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
Preconditions.checkState(vertexToBeReconfiguredByManager, "doneReconfiguringVertex() can be "
+ "invoked only after vertexReconfigurationPlanned() is invoked");
this.vertexToBeReconfiguredByManager = false;
- // TEZ-2015 VM may not have configured everything eg. input edge. maybeSendConfiguredEvent()
- if (completelyConfiguredSent.compareAndSet(false, true)) {
- // vertex already started and at that time this event was not sent. Send now.
- stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName,
- org.apache.tez.dag.api.event.VertexState.CONFIGURED));
+ if (canInitVertex()) {
+ maybeSendConfiguredEvent();
+ } else {
+ Preconditions.checkState(getInternalState() == VertexState.INITIALIZING, "Vertex: "
+ + getLogIdentifier());
}
} finally {
writeLock.unlock();
@@ -3298,7 +3298,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private void maybeSendConfiguredEvent() {
// the vertex is fully configured by the time it starts. Always notify completely configured
// unless the vertex manager has told us that it is going to reconfigure it further
- Preconditions.checkState(canInitVertex());
+ Preconditions.checkState(canInitVertex(), "Vertex: " + getLogIdentifier());
if (!this.vertexToBeReconfiguredByManager) {
// this vertex will not be reconfigured by its manager
if (completelyConfiguredSent.compareAndSet(false, true)) {
http://git-wip-us.apache.org/repos/asf/tez/blob/fbd240f4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 645708e..e94bb17 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -4949,6 +4949,78 @@ public class TestVertexImpl {
Assert.assertNotNull(vB.getTask(0));
Assert.assertNotNull(vC.getTask(0));
}
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testVertexConfiguredDoneByVMBeforeEdgeDefined() throws Exception {
+ // Race when a source vertex manages to start before the target vertex has
+ // been initialized
+ setupPreDagCreation();
+ dagPlan = createSamplerDAGPlan(true);
+ setupPostDagCreation();
+
+ VertexImpl vA = vertices.get("A");
+ VertexImpl vB = vertices.get("B");
+ VertexImpl vC = vertices.get("C");
+
+ TestUpdateListener listener = new TestUpdateListener();
+ updateTracker
+ .registerForVertexUpdates(vB.getName(),
+ EnumSet.of(org.apache.tez.dag.api.event.VertexState.CONFIGURED),
+ listener);
+
+ // fudge the vm so we can do custom stuff
+ vB.vertexManager = new VertexManager(
+ VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()),
+ vB, appContext, mock(StateChangeNotifier.class));
+
+ vB.vertexReconfigurationPlanned();
+
+ dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+ VertexEventType.V_INIT));
+ dispatcher.getEventHandler().handle(new VertexEvent(vA.getVertexId(),
+ VertexEventType.V_START));
+
+ dispatcher.await();
+ Assert.assertEquals(VertexState.INITIALIZING, vA.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, vB.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
+
+ // setting the edge manager should vA to start
+ EdgeManagerPluginDescriptor mockEdgeManagerDescriptor =
+ EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
+ Edge e = vC.sourceVertices.get(vA);
+ Assert.assertNull(e.getEdgeManager());
+ e.setCustomEdgeManager(mockEdgeManagerDescriptor);
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, vA.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, vB.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
+
+ // vB is not configured yet. Edge to C is not configured. So it should not send configured event
+ // even thought VM says its doneConfiguring vertex
+ vB.doneReconfiguringVertex();
+ Assert.assertEquals(0, listener.events.size());
+
+ // complete configuration and verify getting configured signal from vB
+ Map<String, EdgeManagerPluginDescriptor> edges = Maps.newHashMap();
+ edges.put("B", mockEdgeManagerDescriptor);
+ vC.setParallelism(2, vertexLocationHint, edges, null, true);
+
+ dispatcher.await();
+ Assert.assertEquals(1, listener.events.size());
+ Assert.assertEquals(vB.getName(), listener.events.get(0).getVertexName());
+ Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.CONFIGURED,
+ listener.events.get(0).getVertexState());
+ updateTracker.unregisterForVertexUpdates(vB.getName(), listener);
+
+ Assert.assertEquals(VertexState.RUNNING, vA.getState());
+ Assert.assertEquals(VertexState.RUNNING, vB.getState());
+ Assert.assertEquals(VertexState.RUNNING, vC.getState());
+ Assert.assertNotNull(vA.getTask(0));
+ Assert.assertNotNull(vB.getTask(0));
+ Assert.assertNotNull(vC.getTask(0));
+ }
@SuppressWarnings("unchecked")
@Test(timeout = 5000)