You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2018/09/18 22:14:58 UTC
tez git commit: TEZ-3972. Tez DAG can hang when a single task fails
to fetch (Kuhu Shukla via jeagles)
Repository: tez
Updated Branches:
refs/heads/master 68f4cf93a -> c852dbecf
TEZ-3972. Tez DAG can hang when a single task fails to fetch (Kuhu Shukla via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c852dbec
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c852dbec
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c852dbec
Branch: refs/heads/master
Commit: c852dbecf5690dbf922d427701b0a3e8e7283f69
Parents: 68f4cf9
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Tue Sep 18 17:14:44 2018 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Tue Sep 18 17:14:44 2018 -0500
----------------------------------------------------------------------
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 6 +-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 108 ++++++++++++++++++-
2 files changed, 110 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/c852dbec/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 6ad41f8..bbec9ea 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1797,9 +1797,9 @@ public class TaskAttemptImpl implements TaskAttempt,
int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000);
boolean crossTimeDeadline = readErrorTimespanSec >= MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC;
- float failureFraction = ((float) attempt.uniquefailedOutputReports.size())
- / outputFailedEvent.getConsumerTaskNumber();
-
+ int runningTasks = attempt.appContext.getCurrentDAG().getVertex(
+ failedDestTaId.getTaskID().getVertexID()).getRunningTasks();
+ float failureFraction = runningTasks > 0 ? ((float) attempt.uniquefailedOutputReports.size()) / runningTasks : 0;
boolean withinFailureFractionLimits =
(failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION);
boolean withinOutputFailureLimits =
http://git-wip-us.apache.org/repos/asf/tez/blob/c852dbec/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 503e418..5ab68f7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -1820,6 +1820,7 @@ public class TestTaskAttempt {
doReturn(containers).when(appCtx).getAllContainers();
HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
+ DAGImpl mockDAG = mock(DAGImpl.class);
TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
@@ -1852,6 +1853,14 @@ public class TestTaskAttempt {
EventMetaData mockMeta = mock(EventMetaData.class);
TezTaskAttemptID mockDestId1 = mock(TezTaskAttemptID.class);
when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
+ TezTaskID destTaskID = mock(TezTaskID.class);
+ TezVertexID destVertexID = mock(TezVertexID.class);
+ when(mockDestId1.getTaskID()).thenReturn(destTaskID);
+ when(destTaskID.getVertexID()).thenReturn(destVertexID);
+ Vertex destVertex = mock(VertexImpl.class);
+ when(destVertex.getRunningTasks()).thenReturn(11);
+ when(mockDAG.getVertex(destVertexID)).thenReturn(destVertex);
+ when(appCtx.getCurrentDAG()).thenReturn(mockDAG);
TezEvent tzEvent = new TezEvent(mockReEvent, mockMeta);
taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
@@ -1868,7 +1877,14 @@ public class TestTaskAttempt {
// different destination attempt reports error. now threshold crossed
TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class);
- when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId2);
+ when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId2);
+ destTaskID = mock(TezTaskID.class);
+ destVertexID = mock(TezVertexID.class);
+ when(mockDestId2.getTaskID()).thenReturn(destTaskID);
+ when(destTaskID.getVertexID()).thenReturn(destVertexID);
+ destVertex = mock(VertexImpl.class);
+ when(destVertex.getRunningTasks()).thenReturn(11);
+ when(mockDAG.getVertex(destVertexID)).thenReturn(destVertex);
taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
@@ -1923,6 +1939,7 @@ public class TestTaskAttempt {
mockReEvent = InputReadErrorEvent.create("", 1, 1);
mockMeta = mock(EventMetaData.class);
mockDestId1 = mock(TezTaskAttemptID.class);
+ when(mockDestId1.getTaskID()).thenReturn(destTaskID);
when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
tzEvent = new TezEvent(mockReEvent, mockMeta);
//This should fail even when MAX_ALLOWED_OUTPUT_FAILURES_FRACTION is within limits, as
@@ -1957,9 +1974,11 @@ public class TestTaskAttempt {
mockReEvent = InputReadErrorEvent.create("", 1, 1);
mockMeta = mock(EventMetaData.class);
mockDestId1 = mock(TezTaskAttemptID.class);
+ when(mockDestId1.getTaskID()).thenReturn(destTaskID);
when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
tzEvent = new TezEvent(mockReEvent, mockMeta);
when(mockClock.getTime()).thenReturn(1000L);
+ when(destVertex.getRunningTasks()).thenReturn(1000);
// time deadline not exceeded for a couple of read error events
taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent, 1000));
assertEquals("Task attempt is not in succeeded state", taImpl3.getState(),
@@ -1978,6 +1997,93 @@ public class TestTaskAttempt {
verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID3);
}
+ @Test(timeout = 60000)
+ public void testTAFailureBasedOnRunningTasks() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+ appId, 0);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+ MockEventHandler mockEh = new MockEventHandler();
+ MockEventHandler eventHandler = spy(mockEh);
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
+
+ Configuration taskConf = new Configuration();
+ taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ taskConf.setBoolean("fs.file.impl.disable.cache", true);
+
+ locationHint = TaskLocationHint.createTaskLocationHint(
+ new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+ Resource resource = Resource.newInstance(1024, 1);
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
+ new ContainerContextMatcher(), appCtx);
+ containers.addContainerIfNew(container, 0, 0, 0);
+
+ doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+ doReturn(containers).when(appCtx).getAllContainers();
+ HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
+ doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
+ DAGImpl mockDAG = mock(DAGImpl.class);
+
+ TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+ MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+ taListener, taskConf, new SystemClock(),
+ mockHeartbeatHandler, appCtx, false,
+ resource, createFakeContainerContext(), false);
+ TezTaskAttemptID taskAttemptID = taImpl.getID();
+
+ taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+ taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+ taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
+ verify(mockHeartbeatHandler).register(taskAttemptID);
+ taImpl.handle(new TaskAttemptEvent(taskAttemptID,
+ TaskAttemptEventType.TA_DONE));
+ assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
+ TaskAttemptState.SUCCEEDED);
+ verify(mockHeartbeatHandler).unregister(taskAttemptID);
+
+ int expectedEventsTillSucceeded = 8;
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ ArgumentCaptor<DAGHistoryEvent> histArg = ArgumentCaptor.forClass(DAGHistoryEvent.class);
+ verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture());
+ verify(mockHistHandler, times(2)).handle(histArg.capture()); // start and finish
+ DAGHistoryEvent histEvent = histArg.getValue();
+ TaskAttemptFinishedEvent finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent();
+ long finishTime = finishEvent.getFinishTime();
+ verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2);
+
+ InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 1);
+ EventMetaData mockMeta = mock(EventMetaData.class);
+ TezTaskAttemptID mockDestId1 = mock(TezTaskAttemptID.class);
+ when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
+ TezTaskID destTaskID = mock(TezTaskID.class);
+ TezVertexID destVertexID = mock(TezVertexID.class);
+ when(mockDestId1.getTaskID()).thenReturn(destTaskID);
+ when(destTaskID.getVertexID()).thenReturn(destVertexID);
+ Vertex destVertex = mock(VertexImpl.class);
+ when(destVertex.getRunningTasks()).thenReturn(5);
+ when(mockDAG.getVertex(destVertexID)).thenReturn(destVertex);
+ when(appCtx.getCurrentDAG()).thenReturn(mockDAG);
+ TezEvent tzEvent = new TezEvent(mockReEvent, mockMeta);
+ taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
+
+ // failure threshold is met due to running tasks. state is FAILED
+ assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
+ TaskAttemptState.FAILED);
+ }
+
@SuppressWarnings("deprecation")
@Test(timeout = 5000)
public void testKilledInNew() throws ServicePluginException {