You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/11/25 15:02:24 UTC
[4/9] tez git commit: TEZ-2581. Umbrella for Tez Recovery Redesign
(zjffdu)
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 409c506..803edf7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -15,602 +15,1149 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.tez.dag.app.dag.impl;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
-import java.net.URL;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.common.TezAbstractEvent;
-import org.apache.tez.common.counters.TezCounters;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.tez.common.MockDNSToSwitchMapping;
+import org.apache.tez.common.security.ACLManager;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.client.VertexStatus.State;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
+import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData;
+import org.apache.tez.dag.app.RecoveryParser.TaskAttemptRecoveryData;
+import org.apache.tez.dag.app.RecoveryParser.TaskRecoveryData;
+import org.apache.tez.dag.app.RecoveryParser.VertexRecoveryData;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGState;
-import org.apache.tez.dag.app.dag.DAGTerminationCause;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.TaskStateInternal;
+import org.apache.tez.dag.app.dag.TestStateChangeNotifier.StateChangeNotifierForTest;
+import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.event.CallableEvent;
+import org.apache.tez.dag.app.dag.event.CallableEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
-import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
-import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
-import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
+import org.apache.tez.dag.app.dag.impl.TestVertexImpl.EventHandlingRootInputInitializer;
+import org.apache.tez.dag.app.rm.AMSchedulerEvent;
+import org.apache.tez.dag.app.rm.AMSchedulerEventType;
+import org.apache.tez.dag.app.rm.TaskSchedulerManager;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
-import org.apache.tez.dag.history.events.DAGKillRequestEvent;
+import org.apache.tez.dag.history.events.DAGRecoveredEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
-import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
-import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.OutputCommitter;
+import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.library.common.InputIdentifier;
+import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
-import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
-@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestDAGRecovery {
+ private static final Logger LOG = LoggerFactory.getLogger(TestDAGImpl.class);
+ private static Configuration conf;
+ private DrainDispatcher dispatcher;
+ private ListeningExecutorService execService;
+ private Credentials fsTokens;
+ private AppContext appContext;
+ private ACLManager aclManager;
+ private ApplicationAttemptId appAttemptId;
+ private TaskEventDispatcher taskEventDispatcher;
+ private VertexEventDispatcher vertexEventDispatcher;
+ private DagEventDispatcher dagEventDispatcher;
+ private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
+ private TaskHeartbeatHandler thh;
+ private Clock clock = new SystemClock();
+ private DAGFinishEventHandler dagFinishEventHandler;
+ private DAGPlan dagPlan;
private DAGImpl dag;
- private EventHandler mockEventHandler;
+ private TezDAGID dagId;
+ private UserGroupInformation ugi;
+ private MockHistoryEventHandler historyEventHandler;
+ private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
+ private ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,
+ 10));
+ private DAGRecoveryData dagRecoveryData = mock(DAGRecoveryData.class);
+
+ private TezVertexID v1Id; // first vertex
+ private TezTaskID t1v1Id; // first task of v1
+ private TezTaskAttemptID ta1t1v1Id; // first task attempt of the first task of v1
+ private TezVertexID v2Id;
+ private TezTaskID t1v2Id;
+ private TezTaskAttemptID ta1t1v2Id;
+
+ ////////////////////////
+ private Random rand = new Random();
+ private long dagInitedTime = System.currentTimeMillis() + rand.nextInt(100);
+ private long dagStartedTime = dagInitedTime + rand.nextInt(100);
+ private long v1InitedTime = dagStartedTime + rand.nextInt(100);
+ private long v1StartedTime = v1InitedTime + rand.nextInt(100);
+ private int v1NumTask = 10;
+ private long t1StartedTime = v1StartedTime + rand.nextInt(100);
+ private long t1FinishedTime = t1StartedTime + rand.nextInt(100);
+ private long ta1LaunchTime = t1StartedTime + rand.nextInt(100);
+ private long ta1FinishedTime = ta1LaunchTime + rand.nextInt(100);
+
+ private class DagEventDispatcher implements EventHandler<DAGEvent> {
+ @Override
+ public void handle(DAGEvent event) {
+ dag.handle(event);
+ }
+ }
- private String user = "root";
- private String dagName = "dag1";
+ private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handle(TaskEvent event) {
+ TaskImpl task = (TaskImpl) dag.getVertex(event.getTaskID().getVertexID())
+ .getTask(event.getTaskID());
+ task.handle(event);
+ }
+ }
- private AppContext mockAppContext;
- private ApplicationId appId = ApplicationId.newInstance(
- System.currentTimeMillis(), 1);
- private TezDAGID dagId = TezDAGID.getInstance(appId, 1);
- private long initTime = 100L;
- private long startTime = initTime + 200L;
- private long commitStartTime = startTime + 200L;
- private long finishTime = commitStartTime + 200L;
- private TezCounters tezCounters = new TezCounters();
+ @SuppressWarnings("unchecked")
+ private class TaskAttemptEventDispatcher implements
+ EventHandler<TaskAttemptEvent> {
+ @Override
+ public void handle(TaskAttemptEvent event) {
+ Vertex vertex = dag.getVertex(event.getTaskAttemptID().getTaskID()
+ .getVertexID());
+ Task task = vertex.getTask(event.getTaskAttemptID().getTaskID());
+ TaskAttempt ta = task.getAttempt(event.getTaskAttemptID());
+ ((EventHandler<TaskAttemptEvent>) ta).handle(event);
+ }
+ }
- @Before
- public void setUp() {
- mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
- when(mockAppContext.getCurrentDAG().getDagUGI()).thenReturn(null);
- ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
- doReturn(clusterInfo).when(mockAppContext).getClusterInfo();
- mockEventHandler = mock(EventHandler.class);
- tezCounters.findCounter("grp_1", "counter_1").increment(1);
-
- DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
- dag =
- new DAGImpl(dagId, new Configuration(), dagPlan, mockEventHandler,
- mock(TaskCommunicatorManagerInterface.class), new Credentials(),
- new SystemClock(), user, mock(TaskHeartbeatHandler.class),
- mockAppContext);
- }
-
- private void assertNewState() {
- assertEquals(0, dag.getVertices().size());
- assertEquals(0, dag.edges.size());
- assertNull(dag.dagScheduler);
- assertFalse(dag.recoveryCommitInProgress);
- assertEquals(0, dag.recoveredGroupCommits.size());
- }
-
- private void restoreFromDAGInitializedEvent() {
- DAGState recoveredState =
- dag.restoreFromEvent(new DAGInitializedEvent(dagId, initTime, user,
- dagName, null));
- assertEquals(DAGState.INITED, recoveredState);
- assertEquals(initTime, dag.initTime);
- assertEquals(6, dag.getVertices().size());
- assertEquals(6, dag.edges.size());
- assertNotNull(dag.dagScheduler);
- }
-
- private void restoreFromDAGStartedEvent() {
- DAGState recoveredState =
- dag.restoreFromEvent(new DAGStartedEvent(dagId, startTime, user,
- dagName));
- assertEquals(startTime, dag.startTime);
- assertEquals(DAGState.RUNNING, recoveredState);
- }
-
- private void restoreFromDAGCommitStartedEvent() {
- DAGState recoveredState =
- dag.restoreFromEvent(new DAGCommitStartedEvent(dagId, commitStartTime));
- assertTrue(dag.recoveryCommitInProgress);
- assertEquals(DAGState.RUNNING, recoveredState);
- }
-
- private void restoreFromVertexGroupCommitStartedEvent() {
- DAGState recoveredState =
- dag.restoreFromEvent(new VertexGroupCommitStartedEvent(dagId, "g1",
- commitStartTime));
- assertEquals(1, dag.recoveredGroupCommits.size());
- assertFalse(dag.recoveredGroupCommits.get("g1").booleanValue());
- assertEquals(DAGState.RUNNING, recoveredState);
- }
-
- private void restoreFromVertexGroupCommitFinishedEvent() {
- DAGState recoveredState =
- dag.restoreFromEvent(new VertexGroupCommitFinishedEvent(dagId, "g1",
- commitStartTime + 100L));
- assertEquals(1, dag.recoveredGroupCommits.size());
- assertTrue(dag.recoveredGroupCommits.get("g1").booleanValue());
- assertEquals(DAGState.RUNNING, recoveredState);
- }
-
- private void restoreFromDAGFinishedEvent(DAGState finalState) {
- DAGState recoveredState =
- dag.restoreFromEvent(new DAGFinishedEvent(dagId, startTime, finishTime,
- finalState, "", tezCounters, user, dagName, null, null));
- assertEquals(finishTime, dag.finishTime);
- assertFalse(dag.recoveryCommitInProgress);
- assertEquals(finalState, recoveredState);
- assertEquals(tezCounters, dag.fullCounters);
- }
-
- private void restoreFromDAGKillRequestEvent() {
- dag.restoreFromEvent(new DAGKillRequestEvent(dag.getID(), 0L, false));
+ private class VertexEventDispatcher implements EventHandler<VertexEvent> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handle(VertexEvent event) {
+ VertexImpl vertex = (VertexImpl) dag.getVertex(event.getVertexId());
+ vertex.handle(event);
+ }
}
- /**
- * New -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testDAGRecovery_FromNew() {
- assertNewState();
+ private class DAGFinishEventHandler implements
+ EventHandler<DAGAppMasterEventDAGFinished> {
+ public int dagFinishEvents = 0;
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+ @Override
+ public void handle(DAGAppMasterEventDAGFinished event) {
+ ++dagFinishEvents;
+ }
+ }
- ArgumentCaptor<DAGEvent> eventCaptor =
- ArgumentCaptor.forClass(DAGEvent.class);
- verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
- List<DAGEvent> events = eventCaptor.getAllValues();
- assertEquals(2, events.size());
- assertEquals(DAGEventType.DAG_INIT, events.get(0).getType());
- assertEquals(DAGEventType.DAG_START, events.get(1).getType());
+ private class AMSchedulerEventDispatcher implements EventHandler<AMSchedulerEvent> {
+ @Override
+ public void handle(AMSchedulerEvent event) {
+ }
}
+
+ private static class MockHistoryEventHandler extends HistoryEventHandler {
- /**
- * New -> restoreFromDAGKillRequested -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testDAGRecovery_FromNewToKilled() {
- restoreFromDAGKillRequestEvent();
- assertNewState();
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
- assertEquals(DAGState.KILLED, dag.getState());
- assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+ private List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
+
+ public MockHistoryEventHandler(AppContext context) {
+ super(context);
+ }
+
+ @Override
+ public void handleCriticalEvent(DAGHistoryEvent event) throws IOException {
+ this.historyEvents.add(event.getHistoryEvent());
+ }
+
+ public List<HistoryEvent> getHistoryEvents() {
+ return historyEvents;
+ }
+
+ public void verifyHistoryEvent(int expectedTimes, HistoryEventType eventType) {
+ int actualCount = 0;
+ for (HistoryEvent event : historyEvents) {
+ if (event.getEventType() == eventType) {
+ actualCount ++;
+ }
+ }
+ assertEquals(expectedTimes, actualCount);
+ }
}
- /**
- * restoreFromDAGInitializedEvent -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testDAGRecovery_FromInited() {
- assertNewState();
- restoreFromDAGInitializedEvent();
+ public static class MockInputInitializer extends InputInitializer {
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+ public MockInputInitializer(InputInitializerContext initializerContext) {
+ super(initializerContext);
+ }
- assertEquals(DAGState.RUNNING, dag.getState());
- // send recover event to 2 root vertex
- ArgumentCaptor<VertexEvent> eventCaptor =
- ArgumentCaptor.forClass(VertexEvent.class);
- verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
- List<VertexEvent> vertexEvents = eventCaptor.getAllValues();
- assertEquals(2, vertexEvents.size());
- for (VertexEvent vEvent : vertexEvents) {
- assertTrue(vEvent instanceof VertexEventRecoverVertex);
- VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent;
- assertEquals(VertexState.RUNNING, recoverEvent.getDesiredState());
+ @Override
+ public List<Event> initialize() throws Exception {
+ // sleep forever, block the initialization
+ while(true) {
+ Thread.sleep(1000);
+ }
+ }
+
+ @Override
+ public void handleInputInitializerEvent(List<InputInitializerEvent> events)
+ throws Exception {
}
}
- /**
- * restoreFromDAGInitializedEvent -> restoreFromDAGKillRequested -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testDAGRecovery_FromInitedToKilled() {
- restoreFromDAGInitializedEvent();
- restoreFromDAGKillRequestEvent();
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
- assertEquals(DAGState.KILLED, dag.getState());
- assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+ @BeforeClass
+ public static void beforeClass() {
+ MockDNSToSwitchMapping.initializeMockRackResolver();
}
- /**
- * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
- * RecoverTransition
- */
- @Test(timeout = 5000)
- public void testDAGRecovery_FromStarted() {
- assertNewState();
- restoreFromDAGInitializedEvent();
- restoreFromDAGStartedEvent();
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Before
+ public void setup() {
+ conf = new Configuration();
+ conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
+ appAttemptId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(100, 1), 1);
+ dagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 1);
+ Assert.assertNotNull(dagId);
+ dagPlan = createDAGPlan();
+ dispatcher = new DrainDispatcher();
+ fsTokens = new Credentials();
+ appContext = mock(AppContext.class);
+ execService = mock(ListeningExecutorService.class);
+ thh = mock(TaskHeartbeatHandler.class);
+ final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
+
+ Mockito.doAnswer(new Answer() {
+ public ListenableFuture<Void> answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ CallableEvent e = (CallableEvent) args[0];
+ dispatcher.getEventHandler().handle(e);
+ return mockFuture;
+ }
+ }).when(execService).submit((Callable<Void>) any());
+
+ doReturn(execService).when(appContext).getExecService();
+ historyEventHandler = new MockHistoryEventHandler(appContext);
+ aclManager = new ACLManager("amUser");
+ doReturn(conf).when(appContext).getAMConf();
+ doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+ doReturn(appAttemptId.getApplicationId()).when(appContext)
+ .getApplicationID();
+ doReturn(dagId).when(appContext).getCurrentDAGID();
+ doReturn(historyEventHandler).when(appContext).getHistoryHandler();
+ doReturn(aclManager).when(appContext).getAMACLManager();
+ doReturn(dagRecoveryData).when(appContext).getDAGRecoveryData();
+ dag = new DAGImpl(dagId, conf, dagPlan, dispatcher.getEventHandler(),
+ taskCommunicatorManagerInterface, fsTokens, clock, "user", thh,
+ appContext);
+ dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
+ doReturn(dag).when(appContext).getCurrentDAG();
+ ugi = mock(UserGroupInformation.class);
+ UserGroupInformation ugi =dag.getDagUGI();
+ doReturn(clusterInfo).when(appContext).getClusterInfo();
+ TaskSchedulerManager mockTaskScheduler = mock(TaskSchedulerManager.class);
+ doReturn(mockTaskScheduler).when(appContext).getTaskScheduler();
+ v1Id = TezVertexID.getInstance(dagId, 0);
+ t1v1Id = TezTaskID.getInstance(v1Id, 0);
+ ta1t1v1Id = TezTaskAttemptID.getInstance(t1v1Id, 0);
+ v2Id = TezVertexID.getInstance(dagId, 1);
+ t1v2Id = TezTaskID.getInstance(v2Id, 0);
+ ta1t1v2Id = TezTaskAttemptID.getInstance(t1v2Id, 0);
+
+ dispatcher.register(CallableEventType.class, new CallableEventDispatcher());
+ taskEventDispatcher = new TaskEventDispatcher();
+ dispatcher.register(TaskEventType.class, taskEventDispatcher);
+ taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
+ dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher);
+ vertexEventDispatcher = new VertexEventDispatcher();
+ dispatcher.register(VertexEventType.class, vertexEventDispatcher);
+ dagEventDispatcher = new DagEventDispatcher();
+ dispatcher.register(DAGEventType.class, dagEventDispatcher);
+ dagFinishEventHandler = new DAGFinishEventHandler();
+ dispatcher.register(DAGAppMasterEventType.class, dagFinishEventHandler);
+ dispatcher.register(AMSchedulerEventType.class, new AMSchedulerEventDispatcher());
+ dispatcher.init(conf);
+ dispatcher.start();
+ doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler();
+ LogManager.getRootLogger().setLevel(Level.DEBUG);
+ }
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
- assertEquals(DAGState.RUNNING, dag.getState());
- // send recover event to 2 root vertex
- ArgumentCaptor<VertexEvent> eventCaptor =
- ArgumentCaptor.forClass(VertexEvent.class);
- verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
- List<VertexEvent> vertexEvents = eventCaptor.getAllValues();
- assertEquals(2, vertexEvents.size());
- for (VertexEvent vEvent : vertexEvents) {
- assertTrue(vEvent instanceof VertexEventRecoverVertex);
- VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent;
- assertEquals(VertexState.RUNNING, recoverEvent.getDesiredState());
+ public static class RecoveryNotSupportedOutputCommitter extends OutputCommitter {
+
+ public RecoveryNotSupportedOutputCommitter(
+ OutputCommitterContext committerContext) {
+ super(committerContext);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ }
+
+ @Override
+ public void setupOutput() throws Exception {
+ }
+
+ @Override
+ public void commitOutput() throws Exception {
+ }
+
+ @Override
+ public void abortOutput(State finalState) throws Exception {
+ }
+
+ @Override
+ public boolean isTaskRecoverySupported() {
+ return false;
}
}
/**
- * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> restoreFromDAGKillRequested
- * RecoverTransition
+ * v1 v2
+ * \ /
+ * \ /
+ * v3
*/
- @Test(timeout = 5000)
- public void testDAGRecovery_FromStartedtoKilled() {
- assertNewState();
- restoreFromDAGInitializedEvent();
- restoreFromDAGStartedEvent();
- restoreFromDAGKillRequestEvent();
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
- assertEquals(DAGState.KILLED, dag.getState());
- assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
- // send recover event to all the vertices with desired state of KILLED
- ArgumentCaptor<TezAbstractEvent> eventCaptor =
- ArgumentCaptor.forClass(TezAbstractEvent.class);
- verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
- List<TezAbstractEvent> events = eventCaptor.getAllValues();
- assertEquals(7, events.size());
- for (int i=0;i<6;++i) {
- TezAbstractEvent vEvent = events.get(i);
- assertTrue(vEvent instanceof VertexEventRecoverVertex);
- VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent;
- assertEquals(VertexState.KILLED, recoverEvent.getDesiredState());
- }
- assertTrue(events.get(6) instanceof DAGAppMasterEventDAGFinished);
+ private DAGPlan createDAGPlan() {
+ DAGPlan dag = DAGPlan
+ .newBuilder()
+ .setName("testverteximpl")
+ .addVertex(
+ VertexPlan
+ .newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .addInputs(RootInputLeafOutputProto.newBuilder().setName("input1")
+ .setControllerDescriptor(TezEntityDescriptorProto.newBuilder()
+ .setClassName(MockInputInitializer.class.getName()).build()))
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder().addHost("host1")
+ .addRack("rack1").build())
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder().setNumTasks(-1)
+ .setVirtualCores(4).setMemoryMb(1024).setJavaOpts("")
+ .setTaskModule("x1.y1").build())
+ .addOutputs(
+ DAGProtos.RootInputLeafOutputProto
+ .newBuilder()
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("output1").build())
+ .setName("output1")
+ .setControllerDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ CountingOutputCommitter.class.getName())))
+ .addOutEdgeId("e1").build())
+ .addVertex(
+ VertexPlan
+ .newBuilder()
+ .setName("vertex2")
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder().addHost("host2")
+ .addRack("rack2").build())
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder().setNumTasks(1)
+ .setVirtualCores(4).setMemoryMb(1024).setJavaOpts("")
+ .setTaskModule("x2.y2").build())
+ .addOutputs(
+ DAGProtos.RootInputLeafOutputProto
+ .newBuilder()
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("output2").build())
+ .setName("output2")
+ .setControllerDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ RecoveryNotSupportedOutputCommitter.class.getName())))
+ .addOutEdgeId("e2").build())
+ .addVertex(
+ VertexPlan
+ .newBuilder()
+ .setName("vertex3")
+ .setType(PlanVertexType.NORMAL)
+ .setProcessorDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName("x3.y3"))
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder().addHost("host3")
+ .addRack("rack3").build())
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder().setNumTasks(1)
+ .setVirtualCores(4).setMemoryMb(1024)
+ .setJavaOpts("foo").setTaskModule("x3.y3").build())
+ .addOutputs(
+ DAGProtos.RootInputLeafOutputProto
+ .newBuilder()
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("output3").build())
+ .setName("output3")
+ .setControllerDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ CountingOutputCommitter.class.getName())))
+ .addInEdgeId("e1").addInEdgeId("e2").build())
+ .addEdge(
+ EdgePlan
+ .newBuilder()
+ .setEdgeDestination(
+ TezEntityDescriptorProto.newBuilder().setClassName("i2"))
+ .setInputVertexName("vertex1")
+ .setEdgeSource(
+ TezEntityDescriptorProto.newBuilder().setClassName("o1"))
+ .setOutputVertexName("vertex3")
+ .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+ .setId("e1")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL).build())
+ .addEdge(
+ EdgePlan
+ .newBuilder()
+ .setEdgeDestination(
+ TezEntityDescriptorProto.newBuilder().setClassName("i3"))
+ .setInputVertexName("vertex2")
+ .setEdgeSource(
+ TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex3")
+ .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+ .setId("e2")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL).build())
+ .build();
+
+ return dag;
}
+
+ @After
+ public void teardown() {
+ dispatcher.await();
+ dispatcher.stop();
+ execService.shutdownNow();
+ dagPlan = null;
+ if (dag != null) {
+ dag.entityUpdateTracker.stop();
+ }
+ dag = null;
+ }
+
+
+ ////////////////////////////////// DAG Recovery ///////////////////////////////////////////////////
/**
- * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
- * restoreFromDAGFinishedEvent (SUCCEEDED) -> RecoverTransition
+ * RecoveryEvents: SummaryEvent_DAGFinishedEvent(SUCCEEDED)
+ * Recover dag to SUCCEEDED and all of its vertices to SUCCEEDED
*/
- @Test(timeout = 5000)
- public void testDAGRecovery_Finished_SUCCEEDED() {
- assertNewState();
- restoreFromDAGInitializedEvent();
- restoreFromDAGStartedEvent();
- restoreFromDAGFinishedEvent(DAGState.SUCCEEDED);
-
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
- assertEquals(DAGState.SUCCEEDED, dag.getState());
- assertEquals(tezCounters, dag.getAllCounters());
- // recover all the vertices to SUCCEED
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- int i = 0;
- for (; i < 6; ++i) {
- assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
- VertexEventRecoverVertex recoverEvent =
- (VertexEventRecoverVertex) events.get(i);
- assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
- }
+ @Test(timeout=5000)
+ public void testDAGRecoverFromDesiredSucceeded() {
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, DAGState.SUCCEEDED, dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
- // send DAGAppMasterEventDAGFinished at last
- assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
- DAGAppMasterEventDAGFinished dagFinishedEvent =
- (DAGAppMasterEventDAGFinished) events.get(i);
- assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState());
+ assertEquals(DAGState.SUCCEEDED, dag.getState());
+ assertEquals(3, dag.getVertices().size());
+ assertEquals(VertexState.SUCCEEDED, dag.getVertex("vertex1").getState());
+ assertEquals(VertexState.SUCCEEDED, dag.getVertex("vertex2").getState());
+ assertEquals(VertexState.SUCCEEDED, dag.getVertex("vertex3").getState());
+ // DAG#initTime, startTime is not guaranteed to be recovered in this case
}
-
+
/**
- * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
- * restoreFromDAGFinishedEvent(FAILED) -> RecoverTransition
+ * RecoveryEvents: SummaryEvent_DAGFinishedEvent(FAILED)
+ * Recover dag to FAILED and all of its vertices to FAILED
*/
- @Test(timeout = 5000)
- public void testDAGRecovery_Finished_FAILED() {
- assertNewState();
- restoreFromDAGInitializedEvent();
- restoreFromDAGStartedEvent();
- restoreFromDAGFinishedEvent(DAGState.FAILED);
-
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
- assertEquals(DAGState.FAILED, dag.getState());
- assertEquals(tezCounters, dag.getAllCounters());
- // recover all the vertices to FAILED
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- int i = 0;
- for (; i < 6; ++i) {
- assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
- VertexEventRecoverVertex recoverEvent =
- (VertexEventRecoverVertex) events.get(i);
- assertEquals(VertexState.FAILED, recoverEvent.getDesiredState());
- }
+ @Test(timeout=5000)
+ public void testDAGRecoverFromDesiredFailed() {
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, DAGState.FAILED, dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
- // send DAGAppMasterEventDAGFinished at last
- assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
- DAGAppMasterEventDAGFinished dagFinishedEvent =
- (DAGAppMasterEventDAGFinished) events.get(i);
- assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState());
+ assertEquals(DAGState.FAILED, dag.getState());
+ assertEquals(3, dag.getVertices().size());
+ assertEquals(VertexState.FAILED, dag.getVertex("vertex1").getState());
+ assertEquals(VertexState.FAILED, dag.getVertex("vertex2").getState());
+ assertEquals(VertexState.FAILED, dag.getVertex("vertex3").getState());
+ // DAG#initTime, startTime is not guaranteed to be recovered in this case
}
-
+
/**
- * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> ->
- * restoreFromDAGFinishedEvent -> RecoverTransition
+ * RecoveryEvents: SummaryEvent_DAGFinishedEvent(KILLED)
+ * Recover dag to KILLED and all of its vertices to KILLED
*/
- @Test(timeout = 5000)
- public void testDAGRecovery_Finished_KILLED() {
- assertNewState();
- restoreFromDAGInitializedEvent();
- restoreFromDAGStartedEvent();
- restoreFromDAGFinishedEvent(DAGState.KILLED);
-
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
- assertEquals(DAGState.KILLED, dag.getState());
- assertEquals(tezCounters, dag.getAllCounters());
- // recover all the vertices to KILLED
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- int i = 0;
- for (; i < 6; ++i) {
- assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
- VertexEventRecoverVertex recoverEvent =
- (VertexEventRecoverVertex) events.get(i);
- assertEquals(VertexState.KILLED, recoverEvent.getDesiredState());
- }
+ @Test(timeout=5000)
+ public void testDAGRecoverFromDesiredKilled() {
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, DAGState.KILLED, dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
- // send DAGAppMasterEventDAGFinished at last
- assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
- DAGAppMasterEventDAGFinished dagFinishedEvent =
- (DAGAppMasterEventDAGFinished) events.get(i);
- assertEquals(DAGState.KILLED, dagFinishedEvent.getDAGState());
+ assertEquals(DAGState.KILLED, dag.getState());
+ assertEquals(3, dag.getVertices().size());
+ assertEquals(VertexState.KILLED, dag.getVertex("vertex1").getState());
+ assertEquals(VertexState.KILLED, dag.getVertex("vertex2").getState());
+ assertEquals(VertexState.KILLED, dag.getVertex("vertex3").getState());
+ // DAG#initTime, startTime is not guaranteed to be recovered in this case
}
-
+
/**
- * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent
- * --> restoreFromDAGKillRequestEvent -->
- * restoreFromDAGFinishedEvent -> RecoverTransition
+ * RecoveryEvents: SummaryEvent_DAGFinishedEvent(ERROR)
+ * Recover dag to ERROR and all of its vertices to ERROR
*/
- @Test(timeout = 5000)
- public void testDAGRecovery_Finished_KILLED_WithKillRequest() {
- // same behavior as without DAGKillRequestEvent because DAGFinishedEvent is seen
- assertNewState();
- restoreFromDAGInitializedEvent();
- restoreFromDAGStartedEvent();
- restoreFromDAGKillRequestEvent();
- restoreFromDAGFinishedEvent(DAGState.KILLED);
-
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
- assertEquals(DAGState.KILLED, dag.getState());
- assertEquals(tezCounters, dag.getAllCounters());
- // recover all the vertices to KILLED
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- int i = 0;
- for (; i < 6; ++i) {
- assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
- VertexEventRecoverVertex recoverEvent =
- (VertexEventRecoverVertex) events.get(i);
- assertEquals(VertexState.KILLED, recoverEvent.getDesiredState());
- }
+ @Test(timeout=5000)
+ public void testDAGRecoverFromDesiredError() {
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, DAGState.ERROR, dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
- // send DAGAppMasterEventDAGFinished at last
- assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
- DAGAppMasterEventDAGFinished dagFinishedEvent =
- (DAGAppMasterEventDAGFinished) events.get(i);
- assertEquals(DAGState.KILLED, dagFinishedEvent.getDAGState());
+ assertEquals(DAGState.ERROR, dag.getState());
+ assertEquals(3, dag.getVertices().size());
+ assertEquals(VertexState.ERROR, dag.getVertex("vertex1").getState());
+ assertEquals(VertexState.ERROR, dag.getVertex("vertex2").getState());
+ assertEquals(VertexState.ERROR, dag.getVertex("vertex3").getState());
+ // DAG#initTime, startTime is not guaranteed to be recovered in this case
}
-
+
/**
- * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> ->
- * restoreFromDAGFinishedEvent -> RecoverTransition
+ * RecoveryEvents: DAGSubmittedEvent
+ * Recover it as normal dag execution
*/
- @Test(timeout = 5000)
- public void testDAGRecovery_Finished_ERROR() {
- assertNewState();
- restoreFromDAGInitializedEvent();
- restoreFromDAGStartedEvent();
- restoreFromDAGFinishedEvent(DAGState.ERROR);
-
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
- assertEquals(DAGState.ERROR, dag.getState());
- assertEquals(tezCounters, dag.getAllCounters());
- // recover all the vertices to KILLED
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- int i = 0;
- for (; i < 6; ++i) {
- assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
- VertexEventRecoverVertex recoverEvent =
- (VertexEventRecoverVertex) events.get(i);
- assertEquals(VertexState.FAILED, recoverEvent.getDesiredState());
- }
+ @Test(timeout=5000)
+ public void testDAGRecoverFromNew() {
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
- // send DAGAppMasterEventDAGFinished at last
- assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
- DAGAppMasterEventDAGFinished dagFinishedEvent =
- (DAGAppMasterEventDAGFinished) events.get(i);
- assertEquals(DAGState.ERROR, dagFinishedEvent.getDAGState());
+ assertEquals(DAGState.RUNNING, dag.getState());
}
-
+
/**
- * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
- * restoreFromDAG_COMMIT_STARTED -> RecoverTransition
+ * RecoveryEvents: DAGSubmittedEvent, DAGInitializedEvent
+ * Recover it as normal dag execution
*/
- @Test(timeout = 5000)
- public void testDAGRecovery_COMMIT_STARTED() {
- assertNewState();
- restoreFromDAGInitializedEvent();
- restoreFromDAGStartedEvent();
- restoreFromDAGCommitStartedEvent();
-
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
- assertEquals(DAGState.FAILED, dag.getState());
+ @Test(timeout=5000)
+ public void testDAGRecoverFromInited() {
+ DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagId, dagInitedTime,
+ "user", "dagName", null);
+ doReturn(dagInitedEvent).when(dagRecoveryData).getDAGInitializedEvent();
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
- // recover all the vertices to SUCCEEDED
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- int i = 0;
- for (; i < 6; ++i) {
- assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
- VertexEventRecoverVertex recoverEvent =
- (VertexEventRecoverVertex) events.get(i);
- assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
- }
+ assertEquals(DAGState.RUNNING, dag.getState());
+ assertEquals(dagInitedTime, dag.initTime);
+ }
+
+ @Test(timeout=5000)
+ public void testDAGRecoverFromStarted() {
+ DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagId, dagInitedTime,
+ "user", "dagName", null);
+ doReturn(dagInitedEvent).when(dagRecoveryData).getDAGInitializedEvent();
+ DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagId, dagStartedTime, "user", "dagName");
+ doReturn(dagStartedEvent).when(dagRecoveryData).getDAGStartedEvent();
+
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
- // send DAGAppMasterEventDAGFinished at last
- assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
- DAGAppMasterEventDAGFinished dagFinishedEvent =
- (DAGAppMasterEventDAGFinished) events.get(i);
- assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState());
+ assertEquals(DAGState.RUNNING, dag.getState());
+ assertEquals(dagInitedTime, dag.initTime);
+ assertEquals(dagStartedTime, dag.startTime);
+ }
+
+ /////////////////////////////// Vertex Recovery /////////////////////////////////////////
+
+ private void initMockDAGRecoveryDataForVertex() {
+ DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagId, dagInitedTime,
+ "user", "dagName", null);
+ DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagId, dagStartedTime, "user", "dagName");
+ doReturn(dagInitedEvent).when(dagRecoveryData).getDAGInitializedEvent();
+ doReturn(dagStartedEvent).when(dagRecoveryData).getDAGStartedEvent();
}
/**
- * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
- * restoreFromDAG_COMMIT_STARTED -> -> restoreFromDAGFinished (SUCCEEDED)->
- * RecoverTransition
+ * RecoveryEvents:
+ * DAG: DAGInitedEvent -> DAGStartedEvent
+ * V1: No any event
+ *
+ * Reinitialize V1 again.
*/
- @Test(timeout = 5000)
- public void testDAGRecovery_COMMIT_STARTED_Finished_SUCCEEDED() {
- assertNewState();
- restoreFromDAGInitializedEvent();
- restoreFromDAGStartedEvent();
-
- restoreFromDAGCommitStartedEvent();
- restoreFromDAGFinishedEvent(DAGState.SUCCEEDED);
-
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
- assertEquals(DAGState.SUCCEEDED, dag.getState());
-
- // recover all the vertices to SUCCEED
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- int i = 0;
- for (; i < 6; ++i) {
- assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
- VertexEventRecoverVertex recoverEvent =
- (VertexEventRecoverVertex) events.get(i);
- assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
- }
-
- // send DAGAppMasterEventDAGFinished at last
- assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
- DAGAppMasterEventDAGFinished dagFinishedEvent =
- (DAGAppMasterEventDAGFinished) events.get(i);
- assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState());
+ @Test(timeout=5000)
+ public void testVertexRecoverFromNew() {
+ initMockDAGRecoveryDataForVertex();
+
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
+ assertEquals(DAGState.RUNNING, dag.getState());
+ // reinitialize v1 again
+ VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+ VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+ VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+ assertEquals(VertexState.INITIALIZING, v1.getState());
+ assertEquals(VertexState.RUNNING, v2.getState());
+ assertEquals(VertexState.INITED, v3.getState());
}
-
+
/**
- * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
- * restoreFromVERTEX_GROUP_COMMIT_STARTED -> RecoverTransition
+ * RecoveryEvents:
+ * DAG: DAGInitedEvent -> DAGStartedEvent
+ * V1: VertexInitializedEvent
+ *
+ * Reinitialize V1 again.
*/
- @Test(timeout = 5000)
- public void testDAGRecovery_GROUP_COMMIT_STARTED() {
- assertNewState();
- restoreFromDAGInitializedEvent();
- restoreFromDAGStartedEvent();
- restoreFromVertexGroupCommitStartedEvent();
-
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
- assertEquals(DAGState.FAILED, dag.getState());
+ @Test(timeout=5000)
+ public void testVertexRecoverFromInited() {
+ initMockDAGRecoveryDataForVertex();
+ List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+ VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+ "vertex1", 0L, v1InitedTime,
+ v1NumTask, "", null, inputGeneratedTezEvents);
+ VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
+ null, null, null, null, false);
+ doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
- // recover all the vertices to SUCCEEDED
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- int i = 0;
- for (; i < 6; ++i) {
- assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
- VertexEventRecoverVertex recoverEvent =
- (VertexEventRecoverVertex) events.get(i);
- assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
+ assertEquals(DAGState.RUNNING, dag.getState());
+ // reinitialize v1 again because its VertexManager is not completed
+ VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+ VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+ VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+ assertEquals(VertexState.INITIALIZING, v1.getState());
+ assertEquals(VertexState.RUNNING, v2.getState());
+ assertEquals(VertexState.INITED, v3.getState());
+ }
+
+ /**
+ * RecoveryEvents:
+ * DAG: DAGInitedEvent -> DAGStartedEvent
+ * V1: VertexReconfigrationDoneEvent -> VertexInitializedEvent
+ *
+ * V1 skip initialization.
+ */
+ @Test//(timeout=5000)
+ public void testVertexRecoverFromInitedAndReconfigureDone() {
+ initMockDAGRecoveryDataForVertex();
+ List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+ VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+ "vertex1", 0L, v1InitedTime,
+ v1NumTask, "", null, inputGeneratedTezEvents);
+ VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id,
+ 0L, v1NumTask, null, null, null, true);
+ VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
+ v1ReconfigureDoneEvent, null, null, new HashMap<TezTaskID, TaskRecoveryData>(), false);
+ doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
-
- // send DAGAppMasterEventDAGFinished at last
- assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
- DAGAppMasterEventDAGFinished dagFinishedEvent =
- (DAGAppMasterEventDAGFinished) events.get(i);
- assertEquals(DAGState.FAILED, dagFinishedEvent.getDAGState());
+ VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+ VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+ VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
+ assertEquals(DAGState.RUNNING, dag.getState());
+ // v1 skip initialization
+ assertEquals(VertexState.RUNNING, v1.getState());
+ assertEquals(v1InitedTime, v1.initedTime);
+ assertEquals(v1NumTask, v1.getTotalTasks());
+ assertEquals(VertexState.RUNNING, v2.getState());
+ assertEquals(VertexState.RUNNING, v3.getState());
}
-
+
/**
- * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
- * restoreFromVERTEX_GROUP_COMMIT_STARTED -> VERTEX_GROUP_COMMIT_FINISHED ->
- * RecoverTransition
+ * RecoveryEvents:
+ * DAG: DAGInitedEvent -> DAGStartedEvent
+ * V1: VertexReconfigrationDoneEvent -> VertexInitializedEvent -> VertexStartedEvent
+ *
+ * V1 skip initialization.
*/
- @Test(timeout = 5000)
- public void testDAGRecovery_GROUP_COMMIT_STARTED_FINISHED() {
- assertNewState();
- restoreFromDAGInitializedEvent();
- restoreFromDAGStartedEvent();
-
- restoreFromVertexGroupCommitStartedEvent();
- restoreFromVertexGroupCommitFinishedEvent();
-
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+ @Test(timeout=5000)
+ public void testVertexRecoverFromStart() {
+ initMockDAGRecoveryDataForVertex();
+ List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+ VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+ "vertex1", 0L, v1InitedTime,
+ v1NumTask, "", null, inputGeneratedTezEvents);
+ VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id,
+ 0L, v1NumTask, null, null, null, true);
+ VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, v1StartedTime);
+ VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
+ v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap<TezTaskID, TaskRecoveryData>(), false);
+ doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+
+ DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(dagId, dagRecoveryData);
+ dag.handle(recoveryEvent);
+ dispatcher.await();
+
+ VertexImpl v1 = (VertexImpl)dag.getVertex("vertex1");
+ VertexImpl v2 = (VertexImpl)dag.getVertex("vertex2");
+ VertexImpl v3 = (VertexImpl)dag.getVertex("vertex3");
assertEquals(DAGState.RUNNING, dag.getState());
+ // v1 skip initialization
+ assertEquals(VertexState.RUNNING, v1.getState());
+ assertEquals(v1InitedTime, v1.initedTime);
+ assertEquals(v1StartedTime, v1.startedTime);
+ assertEquals(v1NumTask, v1.getTotalTasks());
+ assertEquals(VertexState.RUNNING, v2.getState());
+ assertEquals(VertexState.RUNNING, v3.getState());
+ }
+
+ /////////////////////////////// Task ////////////////////////////////////////////////////////////
+
+ private void initMockDAGRecoveryDataForTask() {
+ List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+ VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+ "vertex1", 0L, v1InitedTime,
+ v1NumTask, "", null, inputGeneratedTezEvents);
+ Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>();
+ VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id,
+ 0L, v1NumTask, null, null, rootInputSpecs, true);
+ VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, v1StartedTime);
+ VertexRecoveryData v1RecoveryData = new VertexRecoveryData(v1InitedEvent,
+ v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap<TezTaskID, TaskRecoveryData>(), false);
+
+ DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagId, dagInitedTime,
+ "user", "dagName", null);
+ DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagId, dagStartedTime, "user", "dagName");
+ doReturn(v1RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+ doReturn(dagInitedEvent).when(dagRecoveryData).getDAGInitializedEvent();
+ doReturn(dagStartedEvent).when(dagRecoveryData).getDAGStartedEvent();
+ }
- // send recover event to 2 root vertex
- verify(mockEventHandler, times(2)).handle(
- any(VertexEventRecoverVertex.class));
- assertEquals(DAGState.RUNNING, dag.getState());
+ /**
+ * RecoveryEvent: TaskFinishedEvent(KILLED)
+ * Recover it to KILLED
+ */
+ @Test(timeout=5000)
+ public void testTaskRecoverFromKilled() {
+ initMockDAGRecoveryDataForTask();
+ TaskFinishedEvent taskFinishedEvent = new TaskFinishedEvent(t1v1Id, "v1",
+ 0L, 0L, null, TaskState.KILLED, "", null, 4);
+ TaskRecoveryData taskRecoveryData = new TaskRecoveryData(null, taskFinishedEvent, null);
+ doReturn(taskRecoveryData).when(dagRecoveryData).getTaskRecoveryData(t1v1Id);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+ dispatcher.await();
+
+ VertexImpl vertex1 = (VertexImpl) dag.getVertex(v1Id);
+ TaskImpl task = (TaskImpl)vertex1.getTask(t1v1Id);
+ assertEquals(TaskStateInternal.KILLED, task.getInternalState());
+ assertEquals(1, vertex1.getCompletedTasks());
+ }
+
+ /**
+ * RecoveryEvent: TaskStartedEvent
+ * Recover it to Scheduled
+ */
+ @Test(timeout=5000)
+ public void testTaskRecoverFromStarted() {
+ initMockDAGRecoveryDataForTask();
+ TaskStartedEvent taskStartedEvent = new TaskStartedEvent(t1v1Id, "v1", 0L, 0L);
+ TaskRecoveryData taskRecoveryData = new TaskRecoveryData(taskStartedEvent, null, null);
+ doReturn(taskRecoveryData).when(dagRecoveryData).getTaskRecoveryData(t1v1Id);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+ dispatcher.await();
+
+ VertexImpl vertex1 = (VertexImpl) dag.getVertex(v1Id);
+ TaskImpl task = (TaskImpl)vertex1.getTask(t1v1Id);
+ assertEquals(TaskStateInternal.SCHEDULED, task.getInternalState());
+ }
+
+ /**
+ * RecoveryEvent: TaskStartedEvent -> TaskFinishedEvent
+ * Recover it to Scheduled
+ */
+ @Test(timeout=5000)
+ public void testTaskRecoverFromSucceeded() {
+ initMockDAGRecoveryDataForTask();
+ TaskStartedEvent taskStartedEvent = new TaskStartedEvent(t1v1Id, "v1", 0L, 0L);
+ TaskFinishedEvent taskFinishedEvent = new TaskFinishedEvent(t1v1Id, "v1",
+ 0L, 0L, null, TaskState.SUCCEEDED, "", null, 4);
+ TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(
+ ta1t1v1Id, "v1", 0L, mock(ContainerId.class),
+ mock(NodeId.class), "", "", "");
+ List<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
+ EventMetaData metadata = new EventMetaData(EventProducerConsumerType.OUTPUT,
+ "vertex1", "vertex3", ta1t1v2Id);
+ taGeneratedEvents.add(new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), metadata));
+ TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+ ta1t1v1Id, "v1", 0L, 0L,
+ TaskAttemptState.SUCCEEDED, null, "", null,
+ null, taGeneratedEvents, 0L, null, 0L);
+ TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
+ Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap =
+ new HashMap<TezTaskAttemptID, TaskAttemptRecoveryData>();
+ taRecoveryDataMap.put(ta1t1v1Id, taRecoveryData);
+ TaskRecoveryData taskRecoveryData = new TaskRecoveryData(taskStartedEvent, taskFinishedEvent, taRecoveryDataMap);
+ doReturn(taskRecoveryData).when(dagRecoveryData).getTaskRecoveryData(t1v1Id);
+ doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+ dispatcher.await();
+
+ VertexImpl vertex1 = (VertexImpl) dag.getVertex(v1Id);
+ TaskImpl task = (TaskImpl)vertex1.getTask(t1v1Id);
+ TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+ assertEquals(VertexState.RUNNING, vertex1.getState());
+ assertEquals(1, vertex1.getCompletedTasks());
+ assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState());
+ assertEquals(TaskAttemptStateInternal.SUCCEEDED, taskAttempt.getInternalState());
+ }
+
+ /////////////////////////////// TaskAttempt Recovery /////////////////////////////////////////////////////
+
+ private void initMockDAGRecoveryDataForTaskAttempt() {
+ TaskStartedEvent t1StartedEvent = new TaskStartedEvent(t1v1Id, "vertex1", 0L, t1StartedTime);
+ TaskRecoveryData taskRecoveryData = new TaskRecoveryData(t1StartedEvent, null, null);
+ Map<TezTaskID, TaskRecoveryData> taskRecoveryDataMap = new HashMap<TezTaskID, TaskRecoveryData>();
+ taskRecoveryDataMap.put(t1v1Id, taskRecoveryData);
+
+ List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
+ VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
+ "vertex1", 0L, v1InitedTime,
+ v1NumTask, "", null, inputGeneratedTezEvents);
+ Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>();
+ VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id,
+ 0L, v1NumTask, null, null, rootInputSpecs, true);
+ VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, v1StartedTime);
+ VertexRecoveryData v1RecoveryData = new VertexRecoveryData(v1InitedEvent,
+ v1ReconfigureDoneEvent, v1StartedEvent, null, taskRecoveryDataMap, false);
+
+ DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagId, dagInitedTime,
+ "user", "dagName", null);
+ DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagId, dagStartedTime, "user", "dagName");
+ doReturn(v1RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
+ doReturn(dagInitedEvent).when(dagRecoveryData).getDAGInitializedEvent();
+ doReturn(dagStartedEvent).when(dagRecoveryData).getDAGStartedEvent();
}
/**
- * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
- * restoreFromVERTEX_GROUP_COMMIT_STARTED -> VERTEX_GROUP_COMMIT_FINISHED ->
- * restoreFromDAG_Finished -> RecoverTransition
+ * RecoveryEvents: TaskAttemptFinishedEvent (FAILED)
+ * Recover it to FAILED
*/
- @Test(timeout = 5000)
- public void testDAGRecovery_GROUP_COMMIT_Finished() {
- assertNewState();
- restoreFromDAGInitializedEvent();
- restoreFromDAGStartedEvent();
+ @Test(timeout=5000)
+ public void testTARecoverFromNewToFailed() {
+ initMockDAGRecoveryDataForTaskAttempt();
+ TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+ ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime,
+ TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, "", null,
+ null, null, 0L, null, 0L);
+ TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent);
+ doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+ dispatcher.await();
+
+ TaskImpl task = (TaskImpl)dag.getVertex(v1Id).getTask(t1v1Id);
+ TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+ assertEquals(TaskAttemptStateInternal.FAILED, taskAttempt.getInternalState());
+ assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, taskAttempt.getTerminationCause());
+ historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_STARTED);
+ historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+ assertEquals(1, task.failedAttempts);
+ // new task attempt is scheduled
+ assertEquals(2, task.getAttempts().size());
+ assertEquals(ta1FinishedTime, taskAttempt.getFinishTime());
+ }
+
+ /**
+ * RecoveryEvents: TaskAttemptFinishedEvent (KILLED)
+ * Recover it to KILLED
+ */
+ @Test(timeout=5000)
+ public void testTARecoverFromNewToKilled() {
+ initMockDAGRecoveryDataForTaskAttempt();
+ TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+ ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime,
+ TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null,
+ null, null, 0L, null, 0L);
+ TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent);
+ doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+ dispatcher.await();
+
+ TaskImpl task = (TaskImpl)dag.getVertex(v1Id).getTask(t1v1Id);
+ TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+ assertEquals(TaskAttemptStateInternal.KILLED, taskAttempt.getInternalState());
+ assertEquals(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, taskAttempt.getTerminationCause());
+ historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_STARTED);
+ historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+ assertEquals(0, task.failedAttempts);
+ assertEquals(ta1FinishedTime, taskAttempt.getFinishTime());
+ }
+
+ /**
+ * RecoveryEvents: TaskAttemptStartedEvent
+ * Recover it to KILLED
+ */
+ @Test(timeout=5000)
+ public void testTARecoverFromRunning() {
+ initMockDAGRecoveryDataForTaskAttempt();
+ TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(
+ ta1t1v1Id, "v1", ta1LaunchTime, mock(ContainerId.class),
+ mock(NodeId.class), "", "", "");
+ TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, null);
+ doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+ dispatcher.await();
+
+ TaskImpl task = (TaskImpl)dag.getVertex(v1Id).getTask(t1v1Id);
+ TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+ assertEquals(TaskAttemptStateInternal.KILLED, taskAttempt.getInternalState());
+ assertEquals(TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, taskAttempt.getTerminationCause());
+ historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_STARTED);
+ historyEventHandler.verifyHistoryEvent(1, HistoryEventType.TASK_ATTEMPT_FINISHED);
+ assertEquals(ta1LaunchTime, taskAttempt.getLaunchTime());
+ }
- restoreFromVertexGroupCommitStartedEvent();
- restoreFromVertexGroupCommitFinishedEvent();
- restoreFromDAGFinishedEvent(DAGState.SUCCEEDED);
+ /**
+ * RecoveryEvents: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent (SUCCEEDED)
+ * Recover it to SUCCEEDED
+ */
+ @Test(timeout=5000)
+ public void testTARecoverFromSucceeded() {
+ initMockDAGRecoveryDataForTaskAttempt();
+ TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(
+ ta1t1v1Id, "v1", ta1LaunchTime, mock(ContainerId.class),
+ mock(NodeId.class), "", "", "");
+ List<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
+ taGeneratedEvents.add(new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), null));
+ TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+ ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime,
+ TaskAttemptState.SUCCEEDED, null, "", null,
+ null, taGeneratedEvents, 0L, null, 0L);
+ TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
+ doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+ dispatcher.await();
+
+ TaskImpl task = (TaskImpl)dag.getVertex(v1Id).getTask(t1v1Id);
+ TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+ assertEquals(TaskAttemptStateInternal.SUCCEEDED, taskAttempt.getInternalState());
+ historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+ assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState());
+ assertEquals(ta1LaunchTime, taskAttempt.getLaunchTime());
+ assertEquals(ta1FinishedTime, taskAttempt.getFinishTime());
+ }
- dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
- assertEquals(DAGState.SUCCEEDED, dag.getState());
- assertEquals(tezCounters, dag.getAllCounters());
- // recover all the vertices to SUCCEEDED
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- int i = 0;
- for (; i < 6; ++i) {
- assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
- VertexEventRecoverVertex recoverEvent =
- (VertexEventRecoverVertex) events.get(i);
- assertEquals(VertexState.SUCCEEDED, recoverEvent.getDesiredState());
- }
+ /**
+ * RecoveryEvents: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent (SUCCEEDED)
+ * Recovered it SUCCEEDED, but task schedule new task attempt
+ * V2's committer is not recovery supported
+ */
+ @Test//(timeout=5000)
+ public void testTARecoverFromSucceeded_OutputCommitterRecoveryNotSupported() {
+ initMockDAGRecoveryDataForTaskAttempt();
+ // set up v2 recovery data
+ // ta1t1v2: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent(SUCCEEDED)
+ // t1v2: TaskStartedEvent
+ // v2: VertexInitializedEvent -> VertexConfigurationDoneEvent -> VertexStartedEvent
+ TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(
+ ta1t1v2Id, "vertex2", ta1LaunchTime, mock(ContainerId.class),
+ mock(NodeId.class), "", "", "");
+ List<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
+ EventMetaData metadata = new EventMetaData(EventProducerConsumerType.OUTPUT,
+ "vertex2", "vertex3", ta1t1v2Id);
+ taGeneratedEvents.add(new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), metadata));
+ TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+ ta1t1v2Id, "vertex2", ta1LaunchTime, ta1FinishedTime,
+ TaskAttemptState.SUCCEEDED, null, "", null,
+ null, taGeneratedEvents, 0L, null, 0L);
+ TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
+ doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v2Id);
+ Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap =
+ new HashMap<TezTaskAttemptID, TaskAttemptRecoveryData>();
+ taRecoveryDataMap.put(ta1t1v2Id, taRecoveryData);
+
+ TaskStartedEvent t1StartedEvent = new TaskStartedEvent(t1v2Id, "vertex2", 0L, t1StartedTime);
+ TaskRecoveryData taskRecoveryData = new TaskRecoveryData(t1StartedEvent, null, taRecoveryDataMap);
+ Map<TezTaskID, TaskRecoveryData> taskRecoveryDataMap = new HashMap<TezTaskID, TaskRecoveryData>();
+ taskRecoveryDataMap.put(t1v2Id, taskRecoveryData);
+ doReturn(taskRecoveryData).when(dagRecoveryData).getTaskRecoveryData(t1v2Id);
+
+ VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id,
+ "vertex2", 0L, v1InitedTime,
+ v1NumTask, "", null, null);
+ VertexConfigurationDoneEvent v2ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v2Id,
+ 0L, v1NumTask, null, null, null, false);
+ VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L, v1StartedTime);
+ VertexRecoveryData v2RecoveryData = new VertexRecoveryData(v2InitedEvent,
+ v2ReconfigureDoneEvent, v2StartedEvent, null, taskRecoveryDataMap, false);
+ doReturn(v2RecoveryData).when(dagRecoveryData).getVertexRecoveryData(v2Id);
+
+
+ dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+ dispatcher.await();
+
+ TaskImpl task = (TaskImpl)dag.getVertex(v2Id).getTask(t1v2Id);
+ TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v2Id);
+ assertEquals(TaskAttemptStateInternal.SUCCEEDED, taskAttempt.getInternalState());
+ historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+ assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
+ // new task attempt is scheduled
+ assertEquals(2, task.getAttempts().size());
+ assertEquals(ta1LaunchTime, taskAttempt.getLaunchTime());
+ assertEquals(ta1FinishedTime, taskAttempt.getFinishTime());
+ }
- // send DAGAppMasterEventDAGFinished at last
- assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
- DAGAppMasterEventDAGFinished dagFinishedEvent =
- (DAGAppMasterEventDAGFinished) events.get(i);
- assertEquals(DAGState.SUCCEEDED, dagFinishedEvent.getDAGState());
+ /**
+ * RecoveryEvents: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent (FAILED)
+ * Recover it to FAILED
+ */
+ @Test(timeout=5000)
+ public void testTARecoverFromFailed() {
+ initMockDAGRecoveryDataForTaskAttempt();
+ TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(
+ ta1t1v1Id, "v1", ta1LaunchTime, mock(ContainerId.class),
+ mock(NodeId.class), "", "", "");
+ TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+ ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime,
+ TaskAttemptState.FAILED, TaskAttemptTerminationCause.INPUT_READ_ERROR, "", null,
+ null, null, 0L, null, 0L);
+ TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
+ doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+ dispatcher.await();
+
+ TaskImpl task = (TaskImpl)dag.getVertex(v1Id).getTask(t1v1Id);
+ TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+ assertEquals(TaskAttemptStateInternal.FAILED, taskAttempt.getInternalState());
+ assertEquals(TaskAttemptTerminationCause.INPUT_READ_ERROR, taskAttempt.getTerminationCause());
+ assertEquals(TaskStateInternal.SCHEDULED, task.getInternalState());
+ assertEquals(2, task.getAttempts().size());
+ historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+ assertEquals(ta1LaunchTime, taskAttempt.getLaunchTime());
+ assertEquals(ta1FinishedTime, taskAttempt.getFinishTime());
}
+ /**
+ * RecoveryEvents: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent (KILLED)
+ * Recover it to KILLED
+ */
+ @Test(timeout=5000)
+ public void testTARecoverFromKilled() {
+ initMockDAGRecoveryDataForTaskAttempt();
+ TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(
+ ta1t1v1Id, "v1", ta1LaunchTime, mock(ContainerId.class),
+ mock(NodeId.class), "", "", "");
+ TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
+ ta1t1v1Id, "v1", ta1FinishedTime, ta1FinishedTime,
+ TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null,
+ null, null, 0L, null, 0L);
+ TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
+ doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
+
+ dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
+ dispatcher.await();
+
+ TaskImpl task = (TaskImpl)dag.getVertex(v1Id).getTask(t1v1Id);
+ TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
+ assertEquals(TaskAttemptStateInternal.KILLED, taskAttempt.getInternalState());
+ assertEquals(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, taskAttempt.getTerminationCause());
+ historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
+ assertEquals(ta1LaunchTime, taskAttempt.getLaunchTime());
+ assertEquals(ta1FinishedTime, taskAttempt.getFinishTime());
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 17295cd..6dd578f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskLocationHint;
@@ -137,6 +139,7 @@ public class TestTaskAttempt {
mockTask = mock(Task.class);
HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
+ LogManager.getRootLogger().setLevel(Level.DEBUG);
}
@Test(timeout = 5000)
@@ -1556,7 +1559,7 @@ public class TestTaskAttempt {
Resource resource, ContainerContext containerContext, boolean leafVertex) {
super(taskId, attemptNumber, eventHandler, tal, conf,
clock, taskHeartbeatHandler, appContext,
- isRescheduled, resource, containerContext, leafVertex, mockTask);
+ isRescheduled, resource, containerContext, leafVertex, mockTask, null);
when(mockTask.getTaskLocationHint()).thenReturn(locationHint);
}