You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/05/12 08:32:13 UTC
[tez] branch branch-0.9 updated: TEZ-4173: isSetParallelismCalled should be checked before skipping vertex reinit (Syed Shameerur Rahman via László Bodor)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 5461160 TEZ-4173: isSetParallelismCalled should be checked before skipping vertex reinit (Syed Shameerur Rahman via László Bodor)
5461160 is described below
commit 5461160214dab43c54a132160af66eecf396bf13
Author: Syed Shameerur Rahman <sr...@qubole.com>
AuthorDate: Tue May 12 10:30:18 2020 +0200
TEZ-4173: isSetParallelismCalled should be checked before skipping vertex reinit (Syed Shameerur Rahman via László Bodor)
Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 6 ++--
.../tez/dag/app/dag/impl/TestDAGRecovery.java | 42 ++++++++++++++++++++--
2 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 26ec7e9..536d51d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2825,10 +2825,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
// - Why using VertexReconfigureDoneEvent
// - VertexReconfigureDoneEvent represent the case that user use API reconfigureVertex
// VertexReconfigureDoneEvent will be logged
- // - TaskStartEvent is seen in that vertex
+ // - TaskStartEvent is seen in that vertex or setVertexParallelism is called
// - All the parent vertices have skipped initializing stage while recovering
if (recoveryData != null && recoveryData.shouldSkipInit()
- && recoveryData.isVertexTasksStarted() && isVertexInitSkippedInParentVertices()) {
+ && (recoveryData.isVertexTasksStarted() ||
+ recoveryData.getVertexConfigurationDoneEvent().isSetParallelismCalled())
+ && isVertexInitSkippedInParentVertices()) {
// Replace the original VertexManager with NoOpVertexManager if the reconfiguration is done in the last AM attempt
VertexConfigurationDoneEvent reconfigureDoneEvent = recoveryData.getVertexConfigurationDoneEvent();
if (LOG.isInfoEnabled()) {
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index fcf6db8..95ea8a0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -746,7 +746,7 @@ public class TestDAGRecovery {
"vertex1", 0L, v1InitedTime,
v1NumTask, "", null, inputGeneratedTezEvents, null);
VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id,
- 0L, v1NumTask, null, null, null, true);
+ 0L, v1NumTask, null, null, null, false);
VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
v1ReconfigureDoneEvent, null, null, new HashMap<TezTaskID, TaskRecoveryData>(), false);
doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
@@ -784,7 +784,7 @@ public class TestDAGRecovery {
"vertex1", 0L, v1InitedTime,
v1NumTask, "", null, inputGeneratedTezEvents, null);
VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id,
- 0L, v1NumTask, null, null, null, true);
+ 0L, v1NumTask, null, null, null, false);
VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, v1StartedTime);
VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap<TezTaskID, TaskRecoveryData>(), false);
@@ -807,6 +807,44 @@ public class TestDAGRecovery {
/**
* RecoveryEvents:
* DAG: DAGInitedEvent -> DAGStartedEvent
+ * V1: VertexReconfigrationDoneEvent -> VertexInitializedEvent -> VertexStartedEvent -> setParallelismCalledFlag
+ *
+ * V1 skip initialization.
+ */
+ @Test(timeout=5000)
+ public void testVertexRecoverWithSetParallelismCalledFlag() {
+ initMockDAGRecoveryDataForVertex();
+ List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+ VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+ "vertex1", 0L, v1InitedTime,
+ v1NumTask, "", null, inputGeneratedTezEvents, null);
+ VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id,
+ 0L, v1NumTask, null, null, null, true);
+ VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, v1StartedTime);
+ VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
+ v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap<TezTaskID, TaskRecoveryData>(), false);
+ doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
+
+ VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+ VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+ VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+ assertEquals(DAGState.RUNNING, dag.getState());
+ // v1 skip initialization
+ assertEquals(VertexState.RUNNING, v1.getState());
+ assertEquals(v1InitedTime, v1.initedTime);
+ assertEquals(v1StartedTime, v1.startedTime);
+ assertEquals(v1NumTask, v1.getTotalTasks());
+ assertEquals(VertexState.RUNNING, v2.getState());
+ assertEquals(VertexState.RUNNING, v3.getState());
+ }
+
+ /**
+ * RecoveryEvents:
+ * DAG: DAGInitedEvent -> DAGStartedEvent
* V1: VertexReconfigrationDoneEvent -> VertexInitializedEvent -> VertexStartedEvent -> VertexTaskStart
*
* V1 skip initialization.