You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/06/18 17:04:04 UTC
tez git commit: TEZ-2552. CRC errors can cause job to run for very
long time in large jobs (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 24fb21e65 -> 41f8a9737
TEZ-2552. CRC errors can cause job to run for very long time in large jobs (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/41f8a973
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/41f8a973
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/41f8a973
Branch: refs/heads/master
Commit: 41f8a9737a6d2424f0453db3e93404323b248d85
Parents: 24fb21e
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Thu Jun 18 20:35:40 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Thu Jun 18 20:35:40 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 24 ++++++++++++
tez-dag/findbugs-exclude.xml | 10 +++++
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 29 +++++++++++---
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 41 ++++++++++++++++++--
5 files changed, 96 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/41f8a973/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7629232..a331b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@ Apache Tez Change Log
Release 0.8.0: Unreleased
INCOMPATIBLE CHANGES
+ TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
TEZ-2468. Change the minimum Java version to Java 7.
ALL CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/41f8a973/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 15b1333..88b1dee 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -208,6 +208,30 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_TASK_LOG_LEVEL_DEFAULT = "INFO";
/**
+ * double value. Represents ratio of unique failed outputs / number of consumer
+ * tasks. When this condition or value mentioned in {@link
+ * #TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES} is met, task would be declared as failed by AM.
+ *
+ * Expert level setting.
+ */
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION =
+ TEZ_TASK_PREFIX + "max.allowed.output.failures.fraction";
+ public static final double TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT = 0.1;
+
+ /**
+ * Int value. Represents maximum allowed unique failures after which a task would be
+ * declared as failed by AM. When this condition or the threshold mentioned in {@link
+ * #TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION} is met, task would be relaunched by AM.
+ *
+ * Expert level setting.
+ */
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES =
+ TEZ_TASK_PREFIX + "max.allowed.output.failures";
+ public static final int TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT = 10;
+
+ /**
* Boolean value. Determines when the final outputs to data sinks are committed. Commit is an
* output specific operation and typically involves making the output visible for consumption.
* If the config is true, then the outputs are committed at the end of DAG completion after all
http://git-wip-us.apache.org/repos/asf/tez/blob/41f8a973/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index c89a9d2..ab7ae5c 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -229,4 +229,14 @@
<Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/>
</Match>
+ <!-- TEZ-2552 -->
+ <Match>
+ <Class name="org.apache.tez.dag.app.dag.impl.TaskAttemptImpl"/>
+ <Or>
+ <Field name="MAX_ALLOWED_OUTPUT_FAILURES_FRACTION"/>
+ <Field name="MAX_ALLOWED_OUTPUT_FAILURES"/>
+ </Or>
+ <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD"/>
+ </Match>
+
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/tez/blob/41f8a973/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 9e47287..f015155 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -155,10 +155,11 @@ public class TaskAttemptImpl implements TaskAttempt,
// Used to store locality information when
Set<String> taskHosts = new HashSet<String>();
Set<String> taskRacks = new HashSet<String>();
-
+
private Set<TezTaskAttemptID> uniquefailedOutputReports =
new HashSet<TezTaskAttemptID>();
- private static final double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = 0.25;
+ private static double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION;
+ private static int MAX_ALLOWED_OUTPUT_FAILURES;
protected final boolean isRescheduled;
private final Resource taskResource;
@@ -373,6 +374,14 @@ public class TaskAttemptImpl implements TaskAttempt,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex,
Task task) {
+
+ this.MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
+ .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
+ .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT);
+
+ this.MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration
+ .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration
+ .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT);
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
@@ -1408,15 +1417,23 @@ public class TaskAttemptImpl implements TaskAttempt,
attempt.uniquefailedOutputReports.add(failedDestTaId);
float failureFraction = ((float) attempt.uniquefailedOutputReports.size())
/ outputFailedEvent.getConsumerTaskNumber();
-
- // If needed we can also use the absolute number of reported output errors
+
+ boolean withinFailureFractionLimits =
+ (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION);
+ boolean withinOutputFailureLimits =
+ (attempt.uniquefailedOutputReports.size() < MAX_ALLOWED_OUTPUT_FAILURES);
+
// If needed we can launch a background task without failing this task
// to generate a copy of the output just in case.
// If needed we can consider only running consumer tasks
- if (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION) {
+ if (withinFailureFractionLimits && withinOutputFailureLimits) {
return attempt.getInternalState();
}
- String message = attempt.getID() + " being failed for too many output errors";
+ String message = attempt.getID() + " being failed for too many output errors. "
+ + "failureFraction=" + failureFraction + ", "
+ + "MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" + MAX_ALLOWED_OUTPUT_FAILURES_FRACTION + ", "
+ + "uniquefailedOutputReports=" + attempt.uniquefailedOutputReports.size() + ", "
+ + "MAX_ALLOWED_OUTPUT_FAILURES=" + MAX_ALLOWED_OUTPUT_FAILURES;
LOG.info(message);
attempt.addDiagnosticInfo(message);
// send input failed event
http://git-wip-us.apache.org/repos/asf/tez/blob/41f8a973/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 15d5609..d21f715 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -1169,14 +1169,14 @@ public class TestTaskAttempt {
TezTaskAttemptID mockDestId1 = mock(TezTaskAttemptID.class);
when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
TezEvent tzEvent = new TezEvent(mockReEvent, mockMeta);
- taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
+ taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
// failure threshold not met. state is SUCCEEDED
assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
// sending same error again doesnt change anything
- taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
+ taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
// default value of error cause
@@ -1185,7 +1185,7 @@ public class TestTaskAttempt {
// different destination attempt reports error. now threshold crossed
TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class);
when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId2);
- taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
+ taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
TaskAttemptState.FAILED);
@@ -1210,6 +1210,41 @@ public class TestTaskAttempt {
// No new events.
verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(
arg.capture());
+
+ taskConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, 1);
+ TezTaskID taskID2 = TezTaskID.getInstance(vertexID, 2);
+ MockTaskAttemptImpl taImpl2 = new MockTaskAttemptImpl(taskID2, 1, eventHandler,
+ taListener, taskConf, new SystemClock(),
+ mockHeartbeatHandler, appCtx, false,
+ resource, createFakeContainerContext(), false);
+ TezTaskAttemptID taskAttemptID2 = taImpl2.getID();
+
+ taImpl2.handle(new TaskAttemptEventSchedule(taskAttemptID2, 0, 0));
+ // At state STARTING.
+ taImpl2.handle(new TaskAttemptEventStartedRemotely(taskAttemptID2, contId, null));
+ verify(mockHeartbeatHandler).register(taskAttemptID2);
+ taImpl2.handle(new TaskAttemptEvent(taskAttemptID2, TaskAttemptEventType.TA_DONE));
+ assertEquals("Task attempt is not in succeeded state", taImpl2.getState(),
+ TaskAttemptState.SUCCEEDED);
+ verify(mockHeartbeatHandler).unregister(taskAttemptID2);
+
+ mockReEvent = InputReadErrorEvent.create("", 1, 1);
+ mockMeta = mock(EventMetaData.class);
+ mockDestId1 = mock(TezTaskAttemptID.class);
+ when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
+ tzEvent = new TezEvent(mockReEvent, mockMeta);
+ //This should fail even when MAX_ALLOWED_OUTPUT_FAILURES_FRACTION is within limits, as
+ // MAX_ALLOWED_OUTPUT_FAILURES has crossed the limit.
+ taImpl2.handle(new TaskAttemptEventOutputFailed(taskAttemptID2, tzEvent, 8));
+ assertEquals("Task attempt is not in succeeded state", taImpl2.getState(),
+ TaskAttemptState.FAILED);
+
+ assertEquals("Task attempt is not in FAILED state", taImpl2.getState(),
+ TaskAttemptState.FAILED);
+ assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl2.getTerminationCause());
+ // verify unregister is not invoked again
+ verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID2);
+
}
@SuppressWarnings("deprecation")