You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/10/30 06:11:11 UTC
tez git commit: TEZ-808. Handle task attempts that are not making
progress (bikas)
Repository: tez
Updated Branches:
refs/heads/master 9ca694789 -> 414258e40
TEZ-808. Handle task attempts that are not making progress (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/414258e4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/414258e4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/414258e4
Branch: refs/heads/master
Commit: 414258e40ff72961c6d1e6fb287b3d2c9732a7a8
Parents: 9ca6947
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Oct 29 22:10:56 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Oct 29 22:10:56 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 14 +++
.../tez/runtime/api/ProcessorContext.java | 4 +-
.../org/apache/tez/runtime/api/TaskContext.java | 10 ++
.../records/TaskAttemptTerminationCause.java | 1 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 25 +++++
.../apache/tez/dag/app/MockDAGAppMaster.java | 3 +-
.../dag/app/TestTaskCommunicatorManager1.java | 4 +-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 98 +++++++++++++++++++-
.../org/apache/tez/runtime/RuntimeTask.java | 11 +++
.../api/events/TaskStatusUpdateEvent.java | 11 ++-
.../api/impl/TezProcessorContextImpl.java | 1 +
.../runtime/api/impl/TezTaskContextImpl.java | 5 +
.../apache/tez/runtime/task/TaskReporter.java | 4 +-
.../TestLogicalIOProcessorRuntimeTask.java | 6 ++
.../runtime/api/impl/TestProcessorContext.java | 8 +-
.../tez/runtime/task/TestTaskReporter.java | 10 ++
.../org/apache/tez/test/TestFaultTolerance.java | 30 +++++-
18 files changed, 232 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 96300fa..03ccdeb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-2679. Admin forms of launch env settings
ALL CHANGES:
+ TEZ-808. Handle task attempts that are not making progress
TEZ-2553. Tez UI: Tez UI Nits
TEZ-2814. ATSImportTool has a return statement in a finally block
TEZ-2906. Compilation fails with hadoop 2.2.0
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index ac3dd4a..0ea8999 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -690,6 +690,20 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
+ "max-events-per-heartbeat";
public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 500;
+
+ /**
+ * Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output
+ * components need to make successive progress notifications. If the progress is not notified
+ * for this interval then the task will be considered hung and terminated.
+ * The value for this config should be larger than {@link TezConfiguration#TASK_HEARTBEAT_TIMEOUT_MS}
+ * and larger than 2 times the value of {@link TezConfiguration#TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS}.
+ * A config value <=0 disables this.
+ */
+ @ConfigurationScope(Scope.VERTEX)
+ @ConfigurationProperty
+ public static final String TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS = TEZ_TASK_PREFIX +
+ "progress.stuck.interval-ms";
+ public static final long TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT = -1;
/**
* Whether to generate counters per IO or not. Enabling this will rename
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
index 2ac6e4c..8b88289 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
public interface ProcessorContext extends TaskContext {
/**
- * Set the overall progress of this Task Attempt
+ * Set the overall progress of this Task Attempt.
+ * This automatically results in invocation of {@link ProcessorContext#notifyProgress()}
+ * and so invoking that separately is not required.
* @param progress Progress in the range from [0.0 - 1.0f]
*/
public void setProgress(float progress);
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
index 92d5575..457b0de 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
@@ -120,6 +120,16 @@ public interface TaskContext {
* @return {@link ObjectRegistry}
*/
public ObjectRegistry getObjectRegistry();
+
+ /**
+ * Notifies the framework that progress is being made by this component.
+ * This is used to identify hung components that are not making progress.
+ * Must be called periodically until processing has completed for this component.
+ * Care must be taken to call this when real progress has been made. Simply
+ * calling this continuously from a thread without regard to real work may prevent
+ * identification of hung components and delay/stall job completion.
+ */
+ public void notifyProgress();
/**
* Report a fatal error to the framework. This will cause the entire task to
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
index 7112d9e..a5214fb 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
@@ -34,6 +34,7 @@ public enum TaskAttemptTerminationCause {
INPUT_READ_ERROR, // Failed due to error in reading inputs
OUTPUT_WRITE_ERROR, // Failed due to error in writing outputs
OUTPUT_LOST, // Failed because attempts output were reported lost
+ NO_PROGRESS, // Failed because no progress was being made
TASK_HEARTBEAT_ERROR, // Failed because AM lost connection to the task
CONTAINER_LAUNCH_FAILED, // Failed to launch container
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 35a23f9..27eb69b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -191,6 +191,8 @@ public class TaskAttemptImpl implements TaskAttempt,
private DAGCounter localityCounter;
org.apache.tez.runtime.api.impl.TaskStatistics statistics;
+
+ long lastNotifyProgressTimestamp = 0;
// Used to store locality information when
Set<String> taskHosts = new HashSet<String>();
@@ -1372,6 +1374,29 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.reportedStatus.progress = statusEvent.getProgress();
ta.reportedStatus.counters = statusEvent.getCounters();
ta.statistics = statusEvent.getStatistics();
+ if (statusEvent.getProgressNotified()) {
+ ta.lastNotifyProgressTimestamp = ta.clock.getTime();
+ } else {
+ long currTime = ta.clock.getTime();
+ long hungIntervalMax = ta.conf.getLong(
+ TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS,
+ TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT);
+ if (hungIntervalMax > 0 &&
+ currTime - ta.lastNotifyProgressTimestamp > hungIntervalMax) {
+ // task is hung
+ String diagnostics = "Attempt failed because it appears to make no progress for " +
+ hungIntervalMax + "ms";
+ LOG.info(diagnostics + " " + ta.getID());
+ // send event that will fail this attempt
+ ta.sendEvent(
+ new TaskAttemptEventAttemptFailed(ta.getID(),
+ TaskAttemptEventType.TA_FAILED,
+ diagnostics,
+ TaskAttemptTerminationCause.NO_PROGRESS)
+ );
+ }
+ }
+
if (sEvent.getReadErrorReported()) {
// if there is a read error then track the next last data event
ta.appendNextDataEvent = true;
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 3cab2da..f2130ab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -414,7 +414,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
cData.numUpdates++;
float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1;
float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f;
- events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
+ events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats, false),
+ new EventMetaData(
EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
MockDAGAppMaster.this.getContext().getClock().getTime()));
TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
index 03b7da9..17fa4d9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
@@ -235,7 +235,7 @@ public class TestTaskCommunicatorManager1 {
@Test (timeout = 5000)
public void testTaskEventRouting() throws Exception {
List<TezEvent> events = Arrays.asList(
- new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
+ new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), null),
new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null),
new TezEvent(new TaskAttemptCompletedEvent(), null)
);
@@ -264,7 +264,7 @@ public class TestTaskCommunicatorManager1 {
@Test (timeout = 5000)
public void testTaskEventRoutingWithReadError() throws Exception {
List<TezEvent> events = Arrays.asList(
- new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
+ new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), null),
new TezEvent(InputReadErrorEvent.create("", 0, 0), null),
new TezEvent(new TaskAttemptCompletedEvent(), null)
);
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 3cf3309..17295cd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -646,7 +647,7 @@ public class TestTaskAttempt {
when(mockTezEvent2.getEventReceivedTime()).thenReturn(ts2);
when(mockTezEvent2.getSourceInfo().getTaskAttemptID()).thenReturn(mockId2);
TaskAttemptEventStatusUpdate statusEvent =
- new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null));
+ new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false));
assertEquals(0, taImpl.lastDataEvents.size());
taImpl.setLastEventSent(mockTezEvent1);
@@ -729,7 +730,7 @@ public class TestTaskAttempt {
arg.getAllValues().subList(0,
expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
- taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null)));
+ taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, "0",
TaskAttemptTerminationCause.APPLICATION_ERROR));
@@ -769,7 +770,95 @@ public class TestTaskAttempt {
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2);
}
-
+
+ @Test (timeout = 5000)
+ public void testNoProgressFail() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+ appId, 0);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+ MockEventHandler eventHandler = spy(new MockEventHandler());
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
+
+ Configuration taskConf = new Configuration();
+ taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ taskConf.setBoolean("fs.file.impl.disable.cache", true);
+ taskConf.setLong(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 75);
+
+ locationHint = TaskLocationHint.createTaskLocationHint(
+ new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+ Resource resource = Resource.newInstance(1024, 1);
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
+ new ContainerContextMatcher(), appCtx);
+ containers.addContainerIfNew(container, 0, 0, 0);
+
+ doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+ doReturn(containers).when(appCtx).getAllContainers();
+
+ TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+ Clock mockClock = mock(Clock.class);
+ TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+ taListener, taskConf, mockClock,
+ mockHeartbeatHandler, appCtx, false,
+ resource, createFakeContainerContext(), false);
+ TezTaskAttemptID taskAttemptID = taImpl.getID();
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+ taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+ // At state STARTING.
+ taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+ null));
+ assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+ TaskAttemptState.RUNNING);
+ verify(mockHeartbeatHandler).register(taskAttemptID);
+
+ when(mockClock.getTime()).thenReturn(100l);
+ taImpl.handle(new TaskAttemptEventStatusUpdate(
+ taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, true)));
+ // invocations and time updated
+ assertEquals(100l, taImpl.lastNotifyProgressTimestamp);
+ when(mockClock.getTime()).thenReturn(150l);
+ taImpl.handle(new TaskAttemptEventStatusUpdate(
+ taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, true)));
+ // invocations and time updated
+ assertEquals(150l, taImpl.lastNotifyProgressTimestamp);
+ when(mockClock.getTime()).thenReturn(200l);
+ taImpl.handle(new TaskAttemptEventStatusUpdate(
+ taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+ // invocations and time not updated
+ assertEquals(150l, taImpl.lastNotifyProgressTimestamp);
+ when(mockClock.getTime()).thenReturn(250l);
+ taImpl.handle(new TaskAttemptEventStatusUpdate(
+ taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+ // invocations and time not updated
+ assertEquals(150l, taImpl.lastNotifyProgressTimestamp);
+ // failed event sent to self
+ verify(eventHandler, atLeast(1)).handle(arg.capture());
+ TaskAttemptEventAttemptFailed fEvent = (TaskAttemptEventAttemptFailed) arg.getValue();
+ assertEquals(taImpl.getID(), fEvent.getTaskAttemptID());
+ assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, fEvent.getTerminationCause());
+ taImpl.handle(fEvent);
+
+ assertEquals("Task attempt is not in the FAIL_IN_PROGRESS state", taImpl.getInternalState(),
+ TaskAttemptStateInternal.FAIL_IN_PROGRESS);
+ verify(mockHeartbeatHandler).unregister(taskAttemptID);
+ assertEquals(1, taImpl.getDiagnostics().size());
+ assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, taImpl.getTerminationCause());
+ }
+
@Test(timeout = 5000)
public void testEventSerializingHash() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);
@@ -858,7 +947,8 @@ public class TestTaskAttempt {
arg.getAllValues().subList(0,
expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
- taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null)));
+ taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID,
+ new TaskStatusUpdateEvent(null, 0.1f, null, false)));
taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index c9c6ba1..23e57b1 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -54,6 +54,7 @@ public abstract class RuntimeTask {
private final AtomicBoolean taskDone;
private final TaskCounterUpdater counterUpdater;
private final TaskStatistics statistics;
+ private volatile boolean progressNotified;
protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) {
@@ -104,6 +105,16 @@ public abstract class RuntimeTask {
this.fatalErrorMessage = message;
}
+ public void notifyProgressInvocation() {
+ progressNotified = true;
+ }
+
+ public boolean getAndClearProgressNotification() {
+ boolean retVal = progressNotified;
+ progressNotified = false;
+ return retVal;
+ }
+
public Throwable getFatalError() {
return this.fatalError.get();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
index 6465bed..518cbf4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
@@ -31,15 +31,18 @@ public class TaskStatusUpdateEvent extends Event implements Writable {
private TezCounters tezCounters;
private float progress;
+ boolean progressNotified;
private TaskStatistics statistics;
public TaskStatusUpdateEvent() {
}
- public TaskStatusUpdateEvent(TezCounters tezCounters, float progress, TaskStatistics statistics) {
+ public TaskStatusUpdateEvent(TezCounters tezCounters, float progress, TaskStatistics statistics,
+ boolean progressNotified) {
this.tezCounters = tezCounters;
this.progress = progress;
this.statistics = statistics;
+ this.progressNotified = progressNotified;
}
public TezCounters getCounters() {
@@ -53,10 +56,15 @@ public class TaskStatusUpdateEvent extends Event implements Writable {
public TaskStatistics getStatistics() {
return statistics;
}
+
+ public boolean getProgressNotified() {
+ return progressNotified;
+ }
@Override
public void write(DataOutput out) throws IOException {
out.writeFloat(progress);
+ out.writeBoolean(progressNotified);
if (tezCounters != null) {
out.writeBoolean(true);
tezCounters.write(out);
@@ -74,6 +82,7 @@ public class TaskStatusUpdateEvent extends Event implements Writable {
@Override
public void readFields(DataInput in) throws IOException {
progress = in.readFloat();
+ progressNotified = in.readBoolean();
if (in.readBoolean()) {
tezCounters = new TezCounters();
tezCounters.readFields(in);
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index 6dc30ff..0c3283d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -94,6 +94,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
@Override
public void setProgress(float progress) {
runtimeTask.setProgress(progress);
+ notifyProgress();
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 5f04c80..211f9d7 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -174,6 +174,11 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable {
}
@Override
+ public void notifyProgress() {
+ runtimeTask.notifyProgressInvocation();
+ }
+
+ @Override
public ByteBuffer getServiceConsumerMetaData(String serviceName) {
return (ByteBuffer) serviceConsumerMetadata.get(serviceName)
.asReadOnlyBuffer().rewind();
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 263300e..30a1b9c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -347,15 +347,17 @@ public class TaskReporter implements TaskReporterInterface {
TezCounters counters = null;
TaskStatistics stats = null;
float progress = 0;
+ boolean progressNotified = false;
if (task.hasInitialized()) {
progress = task.getProgress();
+ progressNotified = task.getAndClearProgressNotification();
if (sendCounters) {
// send these potentially large objects at longer intervals to avoid overloading the AM
counters = task.getCounters();
stats = task.getTaskStatistics();
}
}
- return new TaskStatusUpdateEvent(counters, progress, stats);
+ return new TaskStatusUpdateEvent(counters, progress, stats, progressNotified);
}
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 0fc3919..12fec7e 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -93,6 +93,9 @@ public class TestLogicalIOProcessorRuntimeTask {
assertEquals(1, TestProcessor.runCount);
assertEquals(1, TestInput.startCount);
assertEquals(0, TestOutput.startCount);
+ // test that invocations of progress are counted correctly
+ assertEquals(true, lio1.getAndClearProgressNotification());
+ assertEquals(false, lio1.getAndClearProgressNotification()); // cleared after getting
assertEquals(30, TestInput.vertexParallelism);
assertEquals(0, TestOutput.vertexParallelism);
assertEquals(30, lio1.getProcessorContext().getVertexParallelism());
@@ -237,6 +240,7 @@ public class TestLogicalIOProcessorRuntimeTask {
public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
throws Exception {
runCount++;
+ getContext().notifyProgress();
}
@Override
@@ -271,6 +275,7 @@ public class TestLogicalIOProcessorRuntimeTask {
public void start() throws Exception {
startCount++;
this.vertexParallelism = getContext().getVertexParallelism();
+ getContext().notifyProgress();
}
@Override
@@ -310,6 +315,7 @@ public class TestLogicalIOProcessorRuntimeTask {
System.err.println("Out started");
startCount++;
this.vertexParallelism = getContext().getVertexParallelism();
+ getContext().notifyProgress();
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
index e28df3a..ff94e7f 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java
@@ -17,8 +17,7 @@ package org.apache.tez.runtime.api.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -97,6 +96,9 @@ public class TestProcessorContext {
assertEquals(vertexName, procContext.getTaskVertexName());
assertEquals(vertexId.getId(), procContext.getTaskVertexIndex());
assertTrue(Arrays.equals(localDirs, procContext.getWorkDirs()));
-
+
+ // test auto call of notifyProgress
+ procContext.setProgress(0.1f);
+ verify(runtimeTask, times(1)).notifyProgressInvocation();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
index b44c9f8..e137d50 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
@@ -116,6 +116,8 @@ public class TestTaskReporter {
LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class);
doReturn("vertexName").when(mockTask).getVertexName();
doReturn(mockTaskAttemptId).when(mockTask).getTaskAttemptID();
+ boolean progressNotified = false;
+ doReturn(progressNotified).when(mockTask).getAndClearProgressNotification();
TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class);
float progress = 0.5f;
@@ -136,9 +138,11 @@ public class TestTaskReporter {
TaskStatusUpdateEvent event = heartbeatCallable.getStatusUpdateEvent(true);
verify(mockTask, times(1)).hasInitialized();
verify(mockTask, times(0)).getProgress();
+ verify(mockTask, times(0)).getAndClearProgressNotification();
verify(mockTask, times(0)).getTaskStatistics();
verify(mockTask, times(0)).getCounters();
Assert.assertEquals(0, event.getProgress(), 0);
+ Assert.assertEquals(false, event.getProgressNotified());
Assert.assertNull(event.getCounters());
Assert.assertNull(event.getStatistics());
@@ -147,20 +151,26 @@ public class TestTaskReporter {
event = heartbeatCallable.getStatusUpdateEvent(false);
verify(mockTask, times(2)).hasInitialized();
verify(mockTask, times(1)).getProgress();
+ verify(mockTask, times(1)).getAndClearProgressNotification();
verify(mockTask, times(0)).getTaskStatistics();
verify(mockTask, times(0)).getCounters();
Assert.assertEquals(progress, event.getProgress(), 0);
+ Assert.assertEquals(progressNotified, event.getProgressNotified());
Assert.assertNull(event.getCounters());
Assert.assertNull(event.getStatistics());
// task is initialized - progress obtained and also counters since flag is true
+ progressNotified = true;
+ doReturn(progressNotified).when(mockTask).getAndClearProgressNotification();
doReturn(true).when(mockTask).hasInitialized();
event = heartbeatCallable.getStatusUpdateEvent(true);
verify(mockTask, times(3)).hasInitialized();
verify(mockTask, times(2)).getProgress();
+ verify(mockTask, times(2)).getAndClearProgressNotification();
verify(mockTask, times(1)).getTaskStatistics();
verify(mockTask, times(1)).getCounters();
Assert.assertEquals(progress, event.getProgress(), 0);
+ Assert.assertEquals(progressNotified, event.getProgressNotified());
Assert.assertEquals(counters, event.getCounters());
Assert.assertEquals(stats, event.getStatistics());
http://git-wip-us.apache.org/repos/asf/tez/blob/414258e4/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index ec89c4b..011e91d 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -23,6 +23,9 @@ import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -90,6 +93,7 @@ public class TestFaultTolerance {
tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
tezConf.setDouble(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, 0.4);
tezConf.setInt(TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, 3);
+ tezConf.setInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS, 100);
tezSession = TezClient.create("TestFaultTolerance", tezConf, true);
tezSession.start();
@@ -117,6 +121,11 @@ public class TestFaultTolerance {
}
void runDAGAndVerify(DAG dag, DAGStatus.State finalState, int checkFailedAttempts) throws Exception {
+ runDAGAndVerify(dag, finalState, checkFailedAttempts, null);
+ }
+
+ void runDAGAndVerify(DAG dag, DAGStatus.State finalState, int checkFailedAttempts,
+ String diagnostics) throws Exception {
tezSession.waitTillReady();
DAGClient dagClient = tezSession.submitDAG(dag);
DAGStatus dagStatus = dagClient.getDAGStatus(null);
@@ -129,12 +138,17 @@ public class TestFaultTolerance {
dagStatus = dagClient.getDAGStatus(null);
}
+ Assert.assertEquals(finalState, dagStatus.getState());
+
if (checkFailedAttempts > 0) {
Assert.assertEquals(checkFailedAttempts,
dagStatus.getDAGProgress().getFailedTaskAttemptCount());
}
- Assert.assertEquals(finalState, dagStatus.getState());
+ if (diagnostics != null) {
+ Assert.assertNotNull(dagStatus.getDiagnostics());
+ Assert.assertTrue(Joiner.on(":").join(dagStatus.getDiagnostics()).contains(diagnostics));
+ }
}
@Test (timeout=60000)
@@ -749,4 +763,18 @@ public class TestFaultTolerance {
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
+ @Test (timeout=240000)
+ public void testNoProgress() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setInt(TestProcessor.TEZ_FAILING_PROCESSOR_SLEEP_MS, 1000*100); // long sleep
+ testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+ DAG dag = SimpleTestDAG.createDAG(testConf);
+ Vertex hung = dag.getVertex("v1");
+ hung.setConf(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, Long.toString(1000));
+ hung.setConf(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, Integer.toString(2));
+
+ // dag will fail with 2 attempts failing from vertex v1
+ runDAGAndVerify(dag, DAGStatus.State.FAILED, 2, "no progress");
+ }
+
}