You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/11/25 15:02:23 UTC
[3/9] tez git commit: TEZ-2581. Umbrella for Tez Recovery Redesign
(zjffdu)
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
deleted file mode 100644
index 1aba5fa..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.dag.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
-import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
-import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
-import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.HistoryEventHandler;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
-import org.apache.tez.dag.history.events.TaskFinishedEvent;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import com.google.common.collect.Lists;
-
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class TestTaskAttemptRecovery {
-
- private TaskAttemptImpl ta;
- private EventHandler mockEventHandler;
- private long creationTime = System.currentTimeMillis();
- private long allocationTime = creationTime + 5000;
- private long startTime = allocationTime + 5000;
- private long finishTime = startTime + 5000;
-
- private TezTaskAttemptID taId;
- private String vertexName = "v1";
-
- private AppContext mockAppContext;
- private MockHistoryEventHandler mockHistoryEventHandler;
- private Task mockTask;
- private Vertex mockVertex;
-
- public static class MockHistoryEventHandler extends HistoryEventHandler {
-
- private List<DAGHistoryEvent> events;
-
- public MockHistoryEventHandler(AppContext context) {
- super(context);
- events = new ArrayList<DAGHistoryEvent>();
- }
-
- @Override
- public void handle(DAGHistoryEvent event) {
- events.add(event);
- }
-
- @Override
- public void handleCriticalEvent(DAGHistoryEvent event) throws IOException {
- events.add(event);
- }
-
- void verfiyTaskAttemptFinishedEvent(TezTaskAttemptID taId, TaskAttemptState finalState, int expectedTimes) {
- int actualTimes = 0;
- for (DAGHistoryEvent event : events) {
- if (event.getHistoryEvent().getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) {
- TaskAttemptFinishedEvent tfEvent = (TaskAttemptFinishedEvent)event.getHistoryEvent();
- if (tfEvent.getTaskAttemptID().equals(taId) &&
- tfEvent.getState().equals(finalState)) {
- actualTimes ++;
- }
- }
- }
- assertEquals(expectedTimes, actualTimes);
- }
-
- void verifyTaskFinishedEvent(TezTaskID taskId, TaskState finalState, int expectedTimes) {
- int actualTimes = 0;
- for (DAGHistoryEvent event : events) {
- if (event.getHistoryEvent().getEventType() == HistoryEventType.TASK_FINISHED) {
- TaskFinishedEvent tfEvent = (TaskFinishedEvent)event.getHistoryEvent();
- if (tfEvent.getTaskID().equals(taskId) && tfEvent.getState().equals(finalState)) {
- actualTimes ++;
- }
- }
- }
- assertEquals(expectedTimes, actualTimes);
- }
- }
-
- @Before
- public void setUp() {
- mockTask = mock(Task.class);
- mockVertex = mock(Vertex.class);
- when(mockTask.getVertex()).thenReturn(mockVertex);
- mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
- when(mockAppContext.getCurrentDAG().getVertex(any(TezVertexID.class))
- .getTask(any(TezTaskID.class)))
- .thenReturn(mockTask);
- mockHistoryEventHandler = new MockHistoryEventHandler(mockAppContext);
- when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler);
- mockEventHandler = mock(EventHandler.class);
- TezTaskID taskId =
- TezTaskID.fromString("task_1407371892933_0001_1_00_000000");
- ta =
- new TaskAttemptImpl(taskId, 0, mockEventHandler,
- mock(TaskCommunicatorManagerInterface.class), new Configuration(),
- new SystemClock(), mock(TaskHeartbeatHandler.class),
- mockAppContext, false, Resource.newInstance(1, 1),
- mock(ContainerContext.class), false, mockTask);
- taId = ta.getID();
- }
-
- private void restoreFromTAStartEvent() {
- TaskAttemptState recoveredState =
- ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
- startTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
- assertEquals(startTime, ta.getLaunchTime());
- assertEquals(TaskAttemptState.RUNNING, recoveredState);
- }
-
- private void restoreFromTAFinishedEvent(TaskAttemptState state) {
- String diag = "test_diag";
- TezCounters counters = mock(TezCounters.class);
- TezTaskAttemptID causalId = TezTaskAttemptID.getInstance(taId.getTaskID(), taId.getId()+1);
-
- TaskAttemptTerminationCause errorEnum = null;
- if (state != TaskAttemptState.SUCCEEDED) {
- errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR;
- }
-
- long lastDataEventTime = 1024;
- TezTaskAttemptID lastDataEventTA = mock(TezTaskAttemptID.class);
- List<DataEventDependencyInfo> events = Lists.newLinkedList();
- events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA));
- events.add(new DataEventDependencyInfo(lastDataEventTime, lastDataEventTA));
- TaskAttemptState recoveredState =
- ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- startTime, finishTime, state, errorEnum, diag, counters, events, creationTime,
- causalId, allocationTime));
- assertEquals(causalId, ta.getCreationCausalAttempt());
- assertEquals(creationTime, ta.getCreationTime());
- assertEquals(allocationTime, ta.getAllocationTime());
- assertEquals(startTime, ta.getLaunchTime());
- assertEquals(finishTime, ta.getFinishTime());
- assertEquals(counters, ta.reportedStatus.counters);
- assertEquals(1.0f, ta.reportedStatus.progress, 1e-6);
- assertEquals(state, ta.reportedStatus.state);
- assertEquals(1, ta.getDiagnostics().size());
- assertEquals(diag, ta.getDiagnostics().get(0));
- assertEquals(state, recoveredState);
- assertEquals(events.size(), ta.lastDataEvents.size());
- assertEquals(lastDataEventTime, ta.lastDataEvents.get(0).getTimestamp());
- assertEquals(lastDataEventTA, ta.lastDataEvents.get(0).getTaskAttemptId());
- if (state != TaskAttemptState.SUCCEEDED) {
- assertEquals(errorEnum, ta.getTerminationCause());
- } else {
- assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, ta.getTerminationCause());
- }
- }
-
- private void verifyEvents(List<Event> events, Class<? extends Event> eventClass,
- int expectedTimes) {
- int actualTimes = 0;
- for (Event event : events) {
- if (eventClass.isInstance(event)) {
- actualTimes ++;
- }
- }
- assertEquals(expectedTimes, actualTimes);
- }
-
- /**
- * No any event to restore -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testTARecovery_NEW() {
- ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
- assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
-
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- assertEquals(2, events.size());
- verifyEvents(events, TaskEventTAUpdate.class, 1);
- // one for task killed
- verifyEvents(events, DAGEventCounterUpdate.class, 1);
- }
-
- /**
- * restoreFromTAStartEvent -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testTARecovery_START() {
- restoreFromTAStartEvent();
-
- ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
- assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
-
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(3)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- assertEquals(3, events.size());
- verifyEvents(events, TaskEventTAUpdate.class, 1);
- // one for task launch, one for task killed
- verifyEvents(events, DAGEventCounterUpdate.class, 2);
-
- mockHistoryEventHandler.verfiyTaskAttemptFinishedEvent(taId, TaskAttemptState.KILLED, 1);
- }
-
- /**
- * restoreFromTAStartEvent -> restoreFromTAFinished (SUCCEED)
- * -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testTARecovery_SUCCEED() {
- restoreFromTAStartEvent();
- restoreFromTAFinishedEvent(TaskAttemptState.SUCCEEDED);
-
- ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
- assertEquals(TaskAttemptStateInternal.SUCCEEDED, ta.getInternalState());
-
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- assertEquals(2, events.size());
- // one for task launch, one for task succeeded
- verifyEvents(events, DAGEventCounterUpdate.class, 2);
- }
-
- /**
- * restoreFromTAStartEvent -> restoreFromTAFinished (KILLED)
- * -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testTARecovery_KIILED() {
- restoreFromTAStartEvent();
- restoreFromTAFinishedEvent(TaskAttemptState.KILLED);
-
- ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
- assertEquals(TaskAttemptStateInternal.KILLED, ta.getInternalState());
-
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- assertEquals(2, events.size());
- // one for task launch, one for task killed
- verifyEvents(events, DAGEventCounterUpdate.class, 2);
- }
-
- /**
- * restoreFromTAStartEvent -> restoreFromTAFinished (FAILED)
- * -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testTARecovery_FAILED() {
- restoreFromTAStartEvent();
- restoreFromTAFinishedEvent(TaskAttemptState.FAILED);
-
- ta.handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_RECOVER));
- assertEquals(TaskAttemptStateInternal.FAILED, ta.getInternalState());
-
- ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
- verify(mockEventHandler, times(2)).handle(eventCaptor.capture());
- List<Event> events = eventCaptor.getAllValues();
- assertEquals(2, events.size());
- // one for task launch, one for task killed
- verifyEvents(events, DAGEventCounterUpdate.class, 2);
- }
-
- /**
- * restoreFromTAFinishedEvent ( killed before started)
- */
- @Test(timeout = 5000)
- public void testRecover_FINISH_BUT_NO_START() {
- TaskAttemptState recoveredState =
- ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- startTime, finishTime, TaskAttemptState.KILLED,
- TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters(), null, 0, null, 0));
- assertEquals(TaskAttemptState.KILLED, recoveredState);
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 24c9664..0414c99 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -163,7 +163,7 @@ public class TestTaskImpl {
}
private void scheduleTaskAttempt(TezTaskID taskId) {
- mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint));
+ mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint, false));
assertTaskScheduledState();
assertEquals(mockTaskSpec, mockTask.getBaseTaskSpec());
assertEquals(locationHint, mockTask.getTaskLocationHint());
@@ -762,8 +762,7 @@ public class TestTaskImpl {
boolean isRescheduled,
Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
- appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class),
- schedCausalTA);
+ appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class), schedCausalTA);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
deleted file mode 100644
index bea423a..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ /dev/null
@@ -1,873 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.dag.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.client.VertexStatus.State;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
-import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.StateChangeNotifier;
-import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
-import org.apache.tez.dag.app.dag.TaskStateInternal;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.DAGEventType;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
-import org.apache.tez.dag.app.dag.event.TaskEvent;
-import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
-import org.apache.tez.dag.app.dag.event.TaskEventType;
-import org.apache.tez.dag.app.dag.event.VertexEventType;
-import org.apache.tez.dag.app.dag.impl.TestTaskAttemptRecovery.MockHistoryEventHandler;
-import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
-import org.apache.tez.dag.history.events.TaskFinishedEvent;
-import org.apache.tez.dag.history.events.TaskStartedEvent;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.OutputCommitter;
-import org.apache.tez.runtime.api.OutputCommitterContext;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class TestTaskRecovery {
-
- private TaskImpl task;
- private DrainDispatcher dispatcher;
-
- private int taskAttemptCounter = 0;
-
- private Configuration conf = new Configuration();
- private AppContext mockAppContext;
- private MockHistoryEventHandler mockHistoryEventHandler;
- private ApplicationId appId = ApplicationId.newInstance(
- System.currentTimeMillis(), 1);
- private TezDAGID dagId = TezDAGID.getInstance(appId, 1);
- private TezVertexID vertexId = TezVertexID.getInstance(dagId, 1);
- private Vertex vertex;
- private String vertexName = "v1";
- private long taskScheduledTime = 100L;
- private long taskStartTime = taskScheduledTime + 100L;
- private long taskFinishTime = taskStartTime + 100L;
- private TaskAttemptEventHandler taEventHandler =
- new TaskAttemptEventHandler();
-
- private class TaskEventHandler implements EventHandler<TaskEvent> {
- @Override
- public void handle(TaskEvent event) {
- task.handle(event);
- }
- }
-
- private class TaskAttemptEventHandler implements
- EventHandler<TaskAttemptEvent> {
-
- private List<TaskAttemptEvent> events = Lists.newArrayList();
-
- @Override
- public void handle(TaskAttemptEvent event) {
- events.add(event);
- ((TaskAttemptImpl) task.getAttempt(event.getTaskAttemptID()))
- .handle(event);
- }
-
- public List<TaskAttemptEvent> getEvents() {
- return events;
- }
- }
-
- private class TestOutputCommitter extends OutputCommitter {
-
- boolean recoverySupported = false;
- boolean throwExceptionWhenRecovery = false;
-
- public TestOutputCommitter(OutputCommitterContext committerContext,
- boolean recoverySupported, boolean throwExceptionWhenRecovery) {
- super(committerContext);
- this.recoverySupported = recoverySupported;
- this.throwExceptionWhenRecovery = throwExceptionWhenRecovery;
- }
-
- @Override
- public void recoverTask(int taskIndex, int previousDAGAttempt)
- throws Exception {
- if (throwExceptionWhenRecovery) {
- throw new Exception("fail recovery Task");
- }
- }
-
- @Override
- public boolean isTaskRecoverySupported() {
- return recoverySupported;
- }
-
- @Override
- public void initialize() throws Exception {
-
- }
-
- @Override
- public void setupOutput() throws Exception {
-
- }
-
- @Override
- public void commitOutput() throws Exception {
-
- }
-
- @Override
- public void abortOutput(State finalState) throws Exception {
-
- }
-
- }
-
- @Before
- public void setUp() {
- dispatcher = new DrainDispatcher();
- dispatcher.register(DAGEventType.class, mock(EventHandler.class));
- dispatcher.register(VertexEventType.class, mock(EventHandler.class));
- dispatcher.register(TaskEventType.class, new TaskEventHandler());
- dispatcher.register(TaskAttemptEventType.class, taEventHandler);
- dispatcher.init(new Configuration());
- dispatcher.start();
-
- vertex = mock(Vertex.class, RETURNS_DEEP_STUBS);
- when(vertex.getProcessorDescriptor().getClassName()).thenReturn("");
-
- mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
- when(mockAppContext.getCurrentDAG().getVertex(any(TezVertexID.class)))
- .thenReturn(vertex);
- mockHistoryEventHandler = new MockHistoryEventHandler(mockAppContext);
- when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler);
- task =
- new TaskImpl(vertexId, 0, dispatcher.getEventHandler(),
- new Configuration(), mock(TaskCommunicatorManagerInterface.class),
- new SystemClock(), mock(TaskHeartbeatHandler.class),
- mockAppContext, false, Resource.newInstance(1, 1),
- mock(ContainerContext.class), mock(StateChangeNotifier.class), vertex);
-
- Map<String, OutputCommitter> committers =
- new HashMap<String, OutputCommitter>();
- committers.put("out1", new TestOutputCommitter(
- mock(OutputCommitterContext.class), true, false));
- when(task.getVertex().getOutputCommitters()).thenReturn(committers);
- }
-
- private void restoreFromTaskStartEvent() {
- TaskState recoveredState =
- task.restoreFromEvent(new TaskStartedEvent(task.getTaskId(),
- vertexName, taskScheduledTime, taskStartTime));
- assertEquals(TaskState.SCHEDULED, recoveredState);
- assertEquals(0, task.getFinishedAttemptsCount());
- assertEquals(taskScheduledTime, task.scheduledTime);
- assertEquals(0, task.getAttempts().size());
- }
-
- private void restoreFromFirstTaskAttemptStartEvent(TezTaskAttemptID taId) {
- long taStartTime = taskStartTime + 100L;
- TaskState recoveredState =
- task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
- taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
- assertEquals(TaskState.RUNNING, recoveredState);
- assertEquals(0, task.getFinishedAttemptsCount());
- assertEquals(taskScheduledTime, task.scheduledTime);
- assertEquals(1, task.getAttempts().size());
- assertEquals(TaskAttemptStateInternal.NEW,
- ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
- assertEquals(1, task.getUncompletedAttemptsCount());
- }
-
- /**
- * New -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testRecovery_New() {
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- assertEquals(TaskStateInternal.NEW, task.getInternalState());
- }
-
- /**
- * -> restoreFromTaskFinishEvent ( no TaskStartEvent )
- */
- @Test(timeout = 5000)
- public void testRecovery_NoStartEvent() {
- try {
- task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(), vertexName,
- taskStartTime, taskFinishTime, null, TaskState.SUCCEEDED, "",
- new TezCounters(), 0));
- fail("Should fail due to no TaskStartEvent before TaskFinishEvent");
- } catch (Throwable e) {
- assertTrue(e.getMessage().contains(
- "Finished Event seen but"
- + " no Started Event was encountered earlier"));
- }
- }
-
- /**
- * -> restoreFromTaskFinishEvent ( no TaskStartEvent )
- */
- @Test(timeout = 5000)
- public void testRecoveryNewToKilled_NoStartEvent() {
- task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(), vertexName,
- taskStartTime, taskFinishTime, null, TaskState.KILLED, "",
- new TezCounters(), 0));
- }
-
- /**
- * restoreFromTaskStartedEvent -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testRecovery_Started() {
- restoreFromTaskStartEvent();
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
- // new task attempt is scheduled
- assertEquals(1, task.getAttempts().size());
- assertEquals(0, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(null, task.successfulAttempt);
- }
-
- /**
- * restoreFromTaskStartedEvent -> restoreFromTaskAttemptFinishedEvent (KILLED) ->
- * RecoverTranstion
- */
- @Test(timeout = 5000)
- public void testRecovery_OnlyTAFinishedEvent_KILLED() {
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- 0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters(), null, 0, null, 0));
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- // wait for the second task attempt is scheduled
- dispatcher.await();
- assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
- // taskAttempt_1 is recovered to KILLED, and new task attempt is scheduled
- assertEquals(2, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(null, task.successfulAttempt);
- }
-
- /**
- * restoreFromTaskStartedEvent -> restoreFromTaskAttemptFinishedEvent (FAILED) ->
- * RecoverTranstion
- */
- @Test(timeout = 5000)
- public void testRecovery_OnlyTAFinishedEvent_FAILED() {
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- 0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters(), null, 0, null, 0));
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- // wait for the second task attempt is scheduled
- dispatcher.await();
- assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
- // taskAttempt_1 is recovered to FAILED, and new task attempt is scheduled
- assertEquals(2, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(1, task.failedAttempts);
- assertEquals(null, task.successfulAttempt);
- }
-
- /**
- * restoreFromTaskStartedEvent -> restoreFromTaskAttemptFinishedEvent (SUCCEEDED) ->
- * RecoverTranstion
- */
- @Test(timeout = 5000)
- public void testRecovery_OnlyTAFinishedEvent_SUCCEEDED() {
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- try {
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), null, 0, null, 0));
- fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)");
- } catch (TezUncheckedException e) {
- assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover"));
- }
- }
-
- /**
- * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
- * RecoverTranstion
- */
- @Test(timeout = 5000)
- public void testRecovery_OneTAStarted() {
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- restoreFromFirstTaskAttemptStartEvent(taId);
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- // wait for the second task attempt is scheduled
- dispatcher.await();
- assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
- // taskAttempt_1 is recovered to KILLED, and new task attempt is scheduled
- assertEquals(2, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(null, task.successfulAttempt);
- }
-
- /**
- * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
- * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testRecovery_OneTAStarted_SUCCEEDED() {
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- restoreFromFirstTaskAttemptStartEvent(taId);
-
- long taStartTime = taskStartTime + 100L;
- long taFinishTime = taStartTime + 100L;
- TaskState recoveredState =
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters(), null, 0, null, 0));
- assertEquals(TaskState.SUCCEEDED, recoveredState);
- assertEquals(1, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(0, task.getUncompletedAttemptsCount());
- assertEquals(taId, task.successfulAttempt);
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState());
- assertEquals(1, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(0, task.getUncompletedAttemptsCount());
- assertEquals(taId, task.successfulAttempt);
- mockHistoryEventHandler.verifyTaskFinishedEvent(task.getTaskId(), TaskState.SUCCEEDED, 1);
- }
-
- /**
- * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
- * restoreFromTaskAttemptFinishedEvent (FAILED) -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testRecovery_OneTAStarted_FAILED() {
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- restoreFromFirstTaskAttemptStartEvent(taId);
-
- long taStartTime = taskStartTime + 100L;
- long taFinishTime = taStartTime + 100L;
- TaskState recoveredState =
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
- "", new TezCounters(), null, 0, null, 0));
- assertEquals(TaskState.RUNNING, recoveredState);
- assertEquals(1, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(1, task.failedAttempts);
- assertEquals(0, task.getUncompletedAttemptsCount());
- assertEquals(null, task.successfulAttempt);
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
- // new task attempt is scheduled
- assertEquals(2, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(1, task.failedAttempts);
- assertEquals(1, task.getUncompletedAttemptsCount());
- assertEquals(null, task.successfulAttempt);
- }
-
- /**
- * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
- * restoreFromTaskAttemptFinishedEvent (KILLED) -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testRecovery_OneTAStarted_KILLED() {
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- restoreFromFirstTaskAttemptStartEvent(taId);
-
- long taStartTime = taskStartTime + 100L;
- long taFinishTime = taStartTime + 100L;
- TaskState recoveredState =
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
- "", new TezCounters(), null, 0, null, 0));
- assertEquals(TaskState.RUNNING, recoveredState);
- assertEquals(1, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(0, task.getUncompletedAttemptsCount());
- assertEquals(null, task.successfulAttempt);
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
- // new task attempt is scheduled
- assertEquals(2, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(1, task.getUncompletedAttemptsCount());
- assertEquals(null, task.successfulAttempt);
- }
-
- /**
- * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
- * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) ->
- * restoreFromTaskFinishedEvent -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testRecovery_OneTAStarted_SUCCEEDED_Finished() {
-
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- restoreFromFirstTaskAttemptStartEvent(taId);
-
- long taStartTime = taskStartTime + 100L;
- long taFinishTime = taStartTime + 100L;
- TaskState recoveredState =
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters(), null, 0, null, 0));
- assertEquals(TaskState.SUCCEEDED, recoveredState);
- assertEquals(1, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(0, task.getUncompletedAttemptsCount());
- assertEquals(taId, task.successfulAttempt);
-
- recoveredState =
- task.restoreFromEvent(new TaskFinishedEvent(task.getTaskId(),
- vertexName, taskStartTime, taskFinishTime, taId,
- TaskState.SUCCEEDED, "", new TezCounters(), 0));
- assertEquals(TaskState.SUCCEEDED, recoveredState);
- assertEquals(taId, task.successfulAttempt);
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState());
- assertEquals(1, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(0, task.getUncompletedAttemptsCount());
- assertEquals(taId, task.successfulAttempt);
- mockHistoryEventHandler.verifyTaskFinishedEvent(task.getTaskId(), TaskState.SUCCEEDED, 1);
- }
-
- /**
- * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
- * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) ->
- * restoreFromTaskAttemptFinishedEvent (Failed due to output_failure)
- * restoreFromTaskFinishedEvent -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testRecovery_OneTAStarted_SUCCEEDED_FAILED() {
-
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- restoreFromFirstTaskAttemptStartEvent(taId);
-
- long taStartTime = taskStartTime + 100L;
- long taFinishTime = taStartTime + 100L;
- TaskState recoveredState =
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters(), null, 0, null, 0));
- assertEquals(TaskState.SUCCEEDED, recoveredState);
- assertEquals(1, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(0, task.getUncompletedAttemptsCount());
- assertEquals(taId, task.successfulAttempt);
-
- // it is possible for TaskAttempt transit from SUCCEEDED to FAILURE due to output failure.
- recoveredState =
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
- "", new TezCounters(), null, 0, null, 0));
- assertEquals(TaskState.RUNNING, recoveredState);
- assertEquals(1, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(1, task.failedAttempts);
- assertEquals(0, task.getUncompletedAttemptsCount());
- assertEquals(null, task.successfulAttempt);
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
- assertEquals(2, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(1, task.failedAttempts);
- assertEquals(1, task.getUncompletedAttemptsCount());
- assertEquals(null, task.successfulAttempt);
- }
-
- /**
- * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
- * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) ->
- * restoreFromTaskAttemptFinishedEvent (KILLED due to node failed )
- * restoreFromTaskFinishedEvent -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testRecovery_OneTAStarted_SUCCEEDED_KILLED() {
-
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- restoreFromFirstTaskAttemptStartEvent(taId);
-
- long taStartTime = taskStartTime + 100L;
- long taFinishTime = taStartTime + 100L;
- TaskState recoveredState =
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters(), null, 0, null, 0));
- assertEquals(TaskState.SUCCEEDED, recoveredState);
- assertEquals(1, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(0, task.getUncompletedAttemptsCount());
- assertEquals(taId, task.successfulAttempt);
-
- // it is possible for TaskAttempt transit from SUCCEEDED to KILLED due to node failure.
- recoveredState =
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
- "", new TezCounters(), null, 0, null, 0));
- assertEquals(TaskState.RUNNING, recoveredState);
- assertEquals(1, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(0, task.getUncompletedAttemptsCount());
- assertEquals(null, task.successfulAttempt);
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
- assertEquals(2, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(1, task.getUncompletedAttemptsCount());
- assertEquals(null, task.successfulAttempt);
- }
-
- /**
- * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
- * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testRecovery_Commit_Failed_Recovery_Not_Supported() {
- Map<String, OutputCommitter> committers =
- new HashMap<String, OutputCommitter>();
- committers.put("out1", new TestOutputCommitter(
- mock(OutputCommitterContext.class), false, false));
- when(task.getVertex().getOutputCommitters()).thenReturn(committers);
-
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- restoreFromFirstTaskAttemptStartEvent(taId);
-
- // restoreFromTaskAttemptFinishedEvent (SUCCEEDED)
- long taStartTime = taskStartTime + 100L;
- long taFinishTime = taStartTime + 100L;
- TaskState recoveredState =
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters(), null, 0, null, 0));
- assertEquals(TaskState.SUCCEEDED, recoveredState);
- assertEquals(1, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(0, task.getUncompletedAttemptsCount());
- assertEquals(taId, task.successfulAttempt);
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
- // new task attempt is scheduled
- assertEquals(2, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(1, task.getUncompletedAttemptsCount());
- assertEquals(null, task.successfulAttempt);
- }
-
- /**
- * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
- * restoreFromTaskAttemptFinishedEvent (SUCCEEDED) -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testRecovery_Commit_Failed_recover_fail() {
- Map<String, OutputCommitter> committers =
- new HashMap<String, OutputCommitter>();
- committers.put("out1", new TestOutputCommitter(
- mock(OutputCommitterContext.class), true, true));
- when(task.getVertex().getOutputCommitters()).thenReturn(committers);
-
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- restoreFromFirstTaskAttemptStartEvent(taId);
-
- // restoreFromTaskAttemptFinishedEvent (SUCCEEDED)
- long taStartTime = taskStartTime + 100L;
- long taFinishTime = taStartTime + 100L;
- TaskState recoveredState =
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
- "", new TezCounters(), null, 0, null, 0));
- assertEquals(TaskState.SUCCEEDED, recoveredState);
- assertEquals(1, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(0, task.getUncompletedAttemptsCount());
- assertEquals(taId, task.successfulAttempt);
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
- // new task attempt is scheduled
- assertEquals(2, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(1, task.getUncompletedAttemptsCount());
- assertEquals(null, task.successfulAttempt);
- }
-
- @Test(timeout = 5000)
- public void testRecovery_WithDesired_SUCCEEDED() {
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- restoreFromFirstTaskAttemptStartEvent(taId);
- task.handle(new TaskEventRecoverTask(task.getTaskId(), TaskState.SUCCEEDED,
- false));
- assertEquals(TaskStateInternal.SUCCEEDED, task.getInternalState());
- // no TA_Recovery event sent
- assertEquals(0, taEventHandler.getEvents().size());
- }
-
- @Test(timeout = 5000)
- public void testRecovery_WithDesired_FAILED() {
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- restoreFromFirstTaskAttemptStartEvent(taId);
- task.handle(new TaskEventRecoverTask(task.getTaskId(), TaskState.FAILED,
- false));
- assertEquals(TaskStateInternal.FAILED, task.getInternalState());
- // no TA_Recovery event sent
- assertEquals(0, taEventHandler.getEvents().size());
- }
-
- @Test(timeout = 5000)
- public void testRecovery_WithDesired_KILLED() {
- restoreFromTaskStartEvent();
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- restoreFromFirstTaskAttemptStartEvent(taId);
- task.handle(new TaskEventRecoverTask(task.getTaskId(), TaskState.KILLED,
- false));
- assertEquals(TaskStateInternal.KILLED, task.getInternalState());
- // no TA_Recovery event sent
- assertEquals(0, taEventHandler.getEvents().size());
-
- }
-
- /**
- * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
- * restoreFromTaskAttemptFinishedEvent (KILLED) -> RecoverTransition
- */
- @Test(timeout = 5000)
- public void testRecovery_OneTAStarted_Killed() {
- restoreFromTaskStartEvent();
-
- long taStartTime = taskStartTime + 100L;
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- TaskState recoveredState =
- task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
- taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
- assertEquals(TaskState.RUNNING, recoveredState);
- assertEquals(TaskAttemptStateInternal.NEW,
- ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
- assertEquals(1, task.getAttempts().size());
- assertEquals(0, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(1, task.getUncompletedAttemptsCount());
- assertEquals(null, task.successfulAttempt);
-
- long taFinishTime = taStartTime + 100L;
- recoveredState =
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
- "", new TezCounters(), null, 0, null, 0));
- assertEquals(TaskState.RUNNING, recoveredState);
- assertEquals(TaskAttemptStateInternal.NEW,
- ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
- assertEquals(1, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(0, task.getUncompletedAttemptsCount());
- assertEquals(null, task.successfulAttempt);
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- // wait for Task send TA_RECOVER to TA and TA complete the RecoverTransition
- dispatcher.await();
- assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
- assertEquals(TaskAttemptStateInternal.KILLED,
- ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
- // new task attempt is scheduled
- assertEquals(2, task.getAttempts().size());
- assertEquals(1, task.getFinishedAttemptsCount());
- assertEquals(0, task.failedAttempts);
- assertEquals(1, task.getUncompletedAttemptsCount());
- assertEquals(null, task.successfulAttempt);
- }
-
- /**
- * n = maxFailedAttempts, in the previous AM attempt, n task attempts are
- * killed. When recovering, it should continue to be in running state and
- * schedule a new task attempt.
- */
- @Test(timeout = 5000)
- public void testTaskRecovery_MultipleAttempts1() {
- int maxFailedAttempts =
- conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
- TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
- restoreFromTaskStartEvent();
-
- for (int i = 0; i < maxFailedAttempts; ++i) {
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
- mock(ContainerId.class), mock(NodeId.class), "", "", ""));
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.KILLED, null, "", null, null, 0, null, 0));
- }
- assertEquals(maxFailedAttempts, task.getAttempts().size());
- assertEquals(0, task.failedAttempts);
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- // if the previous task attempt is killed, it should not been take into
- // account when checking whether exceed the max attempts
- assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
- // schedule a new task attempt
- assertEquals(maxFailedAttempts + 1, task.getAttempts().size());
- }
-
- /**
- * n = maxFailedAttempts, in the previous AM attempt, n task attempts are
- * failed. When recovering, it should transit to failed because # of
- * failed_attempt is exceeded.
- */
- @Test(timeout = 5000)
- public void testTaskRecovery_MultipleAttempts2() {
- int maxFailedAttempts =
- conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
- TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
- restoreFromTaskStartEvent();
-
- for (int i = 0; i < maxFailedAttempts; ++i) {
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
- mock(ContainerId.class), mock(NodeId.class), "", "", ""));
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.FAILED, null, "", null, null, 0, null, 0));
- }
- assertEquals(maxFailedAttempts, task.getAttempts().size());
- assertEquals(maxFailedAttempts, task.failedAttempts);
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- // it should transit to failed because of the failed task attempt in the
- // last application attempt.
- assertEquals(TaskStateInternal.FAILED, task.getInternalState());
- assertEquals(maxFailedAttempts, task.getAttempts().size());
- }
-
- /**
- * n = maxFailedAttempts, in the previous AM attempt, n-1 task attempts are
- * killed. And last task attempt is still in running state. When recovering,
- * the last attempt should transit to killed and task is still in running
- * state and new task attempt is scheduled.
- */
- @Test(timeout = 5000)
- public void testTaskRecovery_MultipleAttempts3() throws InterruptedException {
- int maxFailedAttempts =
- conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
- TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
- restoreFromTaskStartEvent();
-
- for (int i = 0; i < maxFailedAttempts - 1; ++i) {
- TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
- task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
- mock(ContainerId.class), mock(NodeId.class), "", "", ""));
- task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.FAILED, null, "", null, null, 0, null, 0));
- }
- assertEquals(maxFailedAttempts - 1, task.getAttempts().size());
- assertEquals(maxFailedAttempts - 1, task.failedAttempts);
-
- TezTaskAttemptID newTaskAttemptId = getNewTaskAttemptID(task.getTaskId());
- TaskState recoveredState =
- task.restoreFromEvent(new TaskAttemptStartedEvent(newTaskAttemptId,
- vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
-
- assertEquals(TaskState.RUNNING, recoveredState);
- assertEquals(TaskAttemptStateInternal.NEW,
- ((TaskAttemptImpl) task.getAttempt(newTaskAttemptId))
- .getInternalState());
- assertEquals(maxFailedAttempts, task.getAttempts().size());
-
- task.handle(new TaskEventRecoverTask(task.getTaskId()));
- // wait until task attempt receive the Recover event from task
- dispatcher.await();
-
- assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
- assertEquals(TaskAttemptStateInternal.KILLED,
- ((TaskAttemptImpl) (task.getAttempt(newTaskAttemptId)))
- .getInternalState());
- assertEquals(maxFailedAttempts - 1, task.failedAttempts);
-
- // new task attempt is added
- assertEquals(maxFailedAttempts + 1, task.getAttempts().size());
- }
-
- private TezTaskAttemptID getNewTaskAttemptID(TezTaskID taskId) {
- return TezTaskAttemptID.getInstance(taskId, taskAttemptCounter++);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 035de32..11c2bf1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -137,8 +137,10 @@ import org.apache.tez.dag.app.dag.event.CallableEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
@@ -3459,11 +3461,8 @@ public class TestVertexImpl {
ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
-
- dispatcher.getEventHandler().handle(
- new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
- new TaskAttemptFailedEvent("Failed"), new EventMetaData(
- EventProducerConsumerType.PROCESSOR, v.getName(), null, ta.getID())))));
+ ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED,
+ "diag", TaskAttemptTerminationCause.APPLICATION_ERROR));
dispatcher.await();
Assert.assertEquals(VertexState.RUNNING, v.getState());
Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, ta.getTerminationCause());
@@ -3496,10 +3495,8 @@ public class TestVertexImpl {
ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
- dispatcher.getEventHandler().handle(
- new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
- new TaskAttemptFailedEvent("Failed"), new EventMetaData(
- EventProducerConsumerType.INPUT, v.getName(), null, ta.getID())))));
+ ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED,
+ "diag", TaskAttemptTerminationCause.INPUT_READ_ERROR));
dispatcher.await();
Assert.assertEquals(VertexState.RUNNING, v.getState());
Assert.assertEquals(TaskAttemptTerminationCause.INPUT_READ_ERROR, ta.getTerminationCause());
@@ -3533,10 +3530,8 @@ public class TestVertexImpl {
ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
- dispatcher.getEventHandler().handle(
- new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
- new TaskAttemptFailedEvent("Failed"), new EventMetaData(
- EventProducerConsumerType.OUTPUT, v.getName(), null, ta.getID())))));
+ ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED,
+ "diag", TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR));
dispatcher.await();
Assert.assertEquals(VertexState.RUNNING, v.getState());
Assert.assertEquals(TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR, ta.getTerminationCause());
@@ -6355,17 +6350,17 @@ public class TestVertexImpl {
v1.handle(new VertexEventRouteEvent(v1.getVertexId(), Lists.newArrayList(tezEvent1)));
dispatcher.await();
assertTrue(v3.pendingTaskEvents.size() != 0);
- ArgumentCaptor<DAGHistoryEvent> argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class);
- verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
- verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1);
+// ArgumentCaptor<DAGHistoryEvent> argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class);
+// verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
+// verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1);
v3.scheduleTasks(Lists.newArrayList(ScheduleTaskRequest.create(0, null)));
dispatcher.await();
assertTrue(v3.pendingTaskEvents.size() == 0);
// recovery events is not only handled one time
- argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class);
- verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
- verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1);
+// argCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class);
+// verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
+// verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1);
}
private void verifyHistoryEvents(List<DAGHistoryEvent> events, HistoryEventType eventType, int expectedTimes) {